f45a612ca008cf63bca6fc0d74abb91d6eadacc4
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use Scalar::Util qw/refaddr weaken reftype blessed/;
12 use List::Util qw/first/;
13 use Sub::Name 'subname';
14 use Context::Preserve 'preserve_context';
15 use Try::Tiny;
16 use overload ();
17 use Data::Compare (); # no imports!!! guard against insane architecture
18 use namespace::clean;
19
20 # default cursor class, overridable in connect_info attributes
21 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
22
23 __PACKAGE__->mk_group_accessors('inherited' => qw/
24   sql_limit_dialect sql_quote_char sql_name_sep
25 /);
26
27 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
28
29 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
30 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
31
32 __PACKAGE__->sql_name_sep('.');
33
34 __PACKAGE__->mk_group_accessors('simple' => qw/
35   _connect_info _dbic_connect_attributes _driver_determined
36   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
37   _perform_autoinc_retrieval _autoinc_supplied_for_op
38 /);
39
40 # the values for these accessors are picked out (and deleted) from
41 # the attribute hashref passed to connect_info
42 my @storage_options = qw/
43   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
44   disable_sth_caching unsafe auto_savepoint
45 /;
46 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
47
48
49 # capability definitions, using a 2-tiered accessor system
50 # The rationale is:
51 #
52 # A driver/user may define _use_X, which blindly without any checks says:
53 # "(do not) use this capability", (use_dbms_capability is an "inherited"
54 # type accessor)
55 #
56 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
57 # accessor, which in turn calls _determine_supports_X, and stores the return
58 # in a special slot on the storage object, which is wiped every time a $dbh
59 # reconnection takes place (it is not guaranteed that upon reconnection we
60 # will get the same rdbms version). _determine_supports_X does not need to
61 # exist on a driver, as we ->can for it before calling.
62
63 my @capabilities = (qw/
64   insert_returning
65   insert_returning_bound
66
67   multicolumn_in
68
69   placeholders
70   typeless_placeholders
71
72   join_optimizer
73 /);
74 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
75 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
76
77 # on by default, not strictly a capability (pending rewrite)
78 __PACKAGE__->_use_join_optimizer (1);
79 sub _determine_supports_join_optimizer { 1 };
80
81 # Each of these methods need _determine_driver called before itself
82 # in order to function reliably. This is a purely DRY optimization
83 #
84 # get_(use)_dbms_capability need to be called on the correct Storage
85 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
86 # _determine_supports_X which obv. needs a correct driver as well
87 my @rdbms_specific_methods = qw/
88   sqlt_type
89   deployment_statements
90
91   sql_maker
92   cursor_class
93
94   build_datetime_parser
95   datetime_parser_type
96
97   txn_begin
98
99   insert
100   insert_bulk
101   update
102   delete
103   select
104   select_single
105
106   with_deferred_fk_checks
107
108   get_use_dbms_capability
109   get_dbms_capability
110
111   _server_info
112   _get_server_version
113 /;
114
115 for my $meth (@rdbms_specific_methods) {
116
117   my $orig = __PACKAGE__->can ($meth)
118     or die "$meth is not a ::Storage::DBI method!";
119
120   no strict qw/refs/;
121   no warnings qw/redefine/;
122   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
123     if (
124       # only fire when invoked on an instance, a valid class-based invocation
125       # would e.g. be setting a default for an inherited accessor
126       ref $_[0]
127         and
128       ! $_[0]->{_driver_determined}
129         and
130       ! $_[0]->{_in_determine_driver}
131         and
132       # Only try to determine stuff if we have *something* that either is or can
133       # provide a DSN. Allows for bare $schema's generated with a plain ->connect()
134       # to still be marginally useful
135       $_[0]->_dbi_connect_info->[0]
136     ) {
137       $_[0]->_determine_driver;
138
139       # This for some reason crashes and burns on perl 5.8.1
140       # IFF the method ends up throwing an exception
141       #goto $_[0]->can ($meth);
142
143       my $cref = $_[0]->can ($meth);
144       goto $cref;
145     }
146
147     goto $orig;
148   };
149 }
150
151 =head1 NAME
152
153 DBIx::Class::Storage::DBI - DBI storage handler
154
155 =head1 SYNOPSIS
156
157   my $schema = MySchema->connect('dbi:SQLite:my.db');
158
159   $schema->storage->debug(1);
160
161   my @stuff = $schema->storage->dbh_do(
162     sub {
163       my ($storage, $dbh, @args) = @_;
164       $dbh->do("DROP TABLE authors");
165     },
166     @column_list
167   );
168
169   $schema->resultset('Book')->search({
170      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
171   });
172
173 =head1 DESCRIPTION
174
175 This class represents the connection to an RDBMS via L<DBI>.  See
176 L<DBIx::Class::Storage> for general information.  This pod only
177 documents DBI-specific methods and behaviors.
178
179 =head1 METHODS
180
181 =cut
182
183 sub new {
184   my $new = shift->next::method(@_);
185
186   $new->_sql_maker_opts({});
187   $new->_dbh_details({});
188   $new->{_in_do_block} = 0;
189
190   # read below to see what this does
191   $new->_arm_global_destructor;
192
193   $new;
194 }
195
196 # This is hack to work around perl shooting stuff in random
197 # order on exit(). If we do not walk the remaining storage
198 # objects in an END block, there is a *small but real* chance
199 # of a fork()ed child to kill the parent's shared DBI handle,
200 # *before perl reaches the DESTROY in this package*
201 # Yes, it is ugly and effective.
202 # Additionally this registry is used by the CLONE method to
203 # make sure no handles are shared between threads
204 {
205   my %seek_and_destroy;
206
207   sub _arm_global_destructor {
208     weaken (
209       $seek_and_destroy{ refaddr($_[0]) } = $_[0]
210     );
211   }
212
213   END {
214     local $?; # just in case the DBI destructor changes it somehow
215
216     # destroy just the object if not native to this process
217     $_->_verify_pid for (grep
218       { defined $_ }
219       values %seek_and_destroy
220     );
221   }
222
223   sub CLONE {
224     # As per DBI's recommendation, DBIC disconnects all handles as
225     # soon as possible (DBIC will reconnect only on demand from within
226     # the thread)
227     my @instances = grep { defined $_ } values %seek_and_destroy;
228     %seek_and_destroy = ();
229
230     for (@instances) {
231       $_->_dbh(undef);
232
233       $_->transaction_depth(0);
234       $_->savepoints([]);
235
236       # properly renumber existing refs
237       $_->_arm_global_destructor
238     }
239   }
240 }
241
242 sub DESTROY {
243   my $self = shift;
244
245   # some databases spew warnings on implicit disconnect
246   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
247   local $SIG{__WARN__} = sub {};
248   $self->_dbh(undef);
249
250   # this op is necessary, since the very last perl runtime statement
251   # triggers a global destruction shootout, and the $SIG localization
252   # may very well be destroyed before perl actually gets to do the
253   # $dbh undef
254   1;
255 }
256
257 # handle pid changes correctly - do not destroy parent's connection
258 sub _verify_pid {
259   my $self = shift;
260
261   my $pid = $self->_conn_pid;
262   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
263     $dbh->{InactiveDestroy} = 1;
264     $self->_dbh(undef);
265     $self->transaction_depth(0);
266     $self->savepoints([]);
267   }
268
269   return;
270 }
271
272 =head2 connect_info
273
274 This method is normally called by L<DBIx::Class::Schema/connection>, which
275 encapsulates its argument list in an arrayref before passing them here.
276
277 The argument list may contain:
278
279 =over
280
281 =item *
282
283 The same 4-element argument set one would normally pass to
284 L<DBI/connect>, optionally followed by
285 L<extra attributes|/DBIx::Class specific connection attributes>
286 recognized by DBIx::Class:
287
288   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
289
290 =item *
291
292 A single code reference which returns a connected
293 L<DBI database handle|DBI/connect> optionally followed by
294 L<extra attributes|/DBIx::Class specific connection attributes> recognized
295 by DBIx::Class:
296
297   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
298
299 =item *
300
301 A single hashref with all the attributes and the dsn/user/password
302 mixed together:
303
304   $connect_info_args = [{
305     dsn => $dsn,
306     user => $user,
307     password => $pass,
308     %dbi_attributes,
309     %extra_attributes,
310   }];
311
312   $connect_info_args = [{
313     dbh_maker => sub { DBI->connect (...) },
314     %dbi_attributes,
315     %extra_attributes,
316   }];
317
318 This is particularly useful for L<Catalyst> based applications, allowing the
319 following config (L<Config::General> style):
320
321   <Model::DB>
322     schema_class   App::DB
323     <connect_info>
324       dsn          dbi:mysql:database=test
325       user         testuser
326       password     TestPass
327       AutoCommit   1
328     </connect_info>
329   </Model::DB>
330
331 The C<dsn>/C<user>/C<password> combination can be substituted by the
332 C<dbh_maker> key whose value is a coderef that returns a connected
333 L<DBI database handle|DBI/connect>
334
335 =back
336
337 Please note that the L<DBI> docs recommend that you always explicitly
338 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
339 recommends that it be set to I<1>, and that you perform transactions
340 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
341 to I<1> if you do not do explicitly set it to zero.  This is the default
342 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
343
344 =head3 DBIx::Class specific connection attributes
345
346 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
347 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
348 the following connection options. These options can be mixed in with your other
349 L<DBI> connection attributes, or placed in a separate hashref
350 (C<\%extra_attributes>) as shown above.
351
352 Every time C<connect_info> is invoked, any previous settings for
353 these options will be cleared before setting the new ones, regardless of
354 whether any options are specified in the new C<connect_info>.
355
356
357 =over
358
359 =item on_connect_do
360
361 Specifies things to do immediately after connecting or re-connecting to
362 the database.  Its value may contain:
363
364 =over
365
366 =item a scalar
367
368 This contains one SQL statement to execute.
369
370 =item an array reference
371
372 This contains SQL statements to execute in order.  Each element contains
373 a string or a code reference that returns a string.
374
375 =item a code reference
376
377 This contains some code to execute.  Unlike code references within an
378 array reference, its return value is ignored.
379
380 =back
381
382 =item on_disconnect_do
383
384 Takes arguments in the same form as L</on_connect_do> and executes them
385 immediately before disconnecting from the database.
386
387 Note, this only runs if you explicitly call L</disconnect> on the
388 storage object.
389
390 =item on_connect_call
391
392 A more generalized form of L</on_connect_do> that calls the specified
393 C<connect_call_METHOD> methods in your storage driver.
394
395   on_connect_do => 'select 1'
396
397 is equivalent to:
398
399   on_connect_call => [ [ do_sql => 'select 1' ] ]
400
401 Its values may contain:
402
403 =over
404
405 =item a scalar
406
407 Will call the C<connect_call_METHOD> method.
408
409 =item a code reference
410
411 Will execute C<< $code->($storage) >>
412
413 =item an array reference
414
415 Each value can be a method name or code reference.
416
417 =item an array of arrays
418
419 For each array, the first item is taken to be the C<connect_call_> method name
420 or code reference, and the rest are parameters to it.
421
422 =back
423
424 Some predefined storage methods you may use:
425
426 =over
427
428 =item do_sql
429
430 Executes a SQL string or a code reference that returns a SQL string. This is
431 what L</on_connect_do> and L</on_disconnect_do> use.
432
433 It can take:
434
435 =over
436
437 =item a scalar
438
439 Will execute the scalar as SQL.
440
441 =item an arrayref
442
443 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
444 attributes hashref and bind values.
445
446 =item a code reference
447
448 Will execute C<< $code->($storage) >> and execute the return array refs as
449 above.
450
451 =back
452
453 =item datetime_setup
454
455 Execute any statements necessary to initialize the database session to return
456 and accept datetime/timestamp values used with
457 L<DBIx::Class::InflateColumn::DateTime>.
458
459 Only necessary for some databases, see your specific storage driver for
460 implementation details.
461
462 =back
463
464 =item on_disconnect_call
465
466 Takes arguments in the same form as L</on_connect_call> and executes them
467 immediately before disconnecting from the database.
468
469 Calls the C<disconnect_call_METHOD> methods as opposed to the
470 C<connect_call_METHOD> methods called by L</on_connect_call>.
471
472 Note, this only runs if you explicitly call L</disconnect> on the
473 storage object.
474
475 =item disable_sth_caching
476
477 If set to a true value, this option will disable the caching of
478 statement handles via L<DBI/prepare_cached>.
479
480 =item limit_dialect
481
482 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
483 default L</sql_limit_dialect> setting of the storage (if any). For a list
484 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
485
486 =item quote_names
487
488 When true automatically sets L</quote_char> and L</name_sep> to the characters
489 appropriate for your particular RDBMS. This option is preferred over specifying
490 L</quote_char> directly.
491
492 =item quote_char
493
494 Specifies what characters to use to quote table and column names.
495
496 C<quote_char> expects either a single character, in which case is it
497 is placed on either side of the table/column name, or an arrayref of length
498 2 in which case the table/column name is placed between the elements.
499
500 For example under MySQL you should use C<< quote_char => '`' >>, and for
501 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
502
503 =item name_sep
504
505 This parameter is only useful in conjunction with C<quote_char>, and is used to
506 specify the character that separates elements (schemas, tables, columns) from
507 each other. If unspecified it defaults to the most commonly used C<.>.
508
509 =item unsafe
510
511 This Storage driver normally installs its own C<HandleError>, sets
512 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
513 all database handles, including those supplied by a coderef.  It does this
514 so that it can have consistent and useful error behavior.
515
516 If you set this option to a true value, Storage will not do its usual
517 modifications to the database handle's attributes, and instead relies on
518 the settings in your connect_info DBI options (or the values you set in
519 your connection coderef, in the case that you are connecting via coderef).
520
521 Note that your custom settings can cause Storage to malfunction,
522 especially if you set a C<HandleError> handler that suppresses exceptions
523 and/or disable C<RaiseError>.
524
525 =item auto_savepoint
526
527 If this option is true, L<DBIx::Class> will use savepoints when nesting
528 transactions, making it possible to recover from failure in the inner
529 transaction without having to abort all outer transactions.
530
531 =item cursor_class
532
533 Use this argument to supply a cursor class other than the default
534 L<DBIx::Class::Storage::DBI::Cursor>.
535
536 =back
537
538 Some real-life examples of arguments to L</connect_info> and
539 L<DBIx::Class::Schema/connect>
540
541   # Simple SQLite connection
542   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
543
544   # Connect via subref
545   ->connect_info([ sub { DBI->connect(...) } ]);
546
547   # Connect via subref in hashref
548   ->connect_info([{
549     dbh_maker => sub { DBI->connect(...) },
550     on_connect_do => 'alter session ...',
551   }]);
552
553   # A bit more complicated
554   ->connect_info(
555     [
556       'dbi:Pg:dbname=foo',
557       'postgres',
558       'my_pg_password',
559       { AutoCommit => 1 },
560       { quote_char => q{"} },
561     ]
562   );
563
564   # Equivalent to the previous example
565   ->connect_info(
566     [
567       'dbi:Pg:dbname=foo',
568       'postgres',
569       'my_pg_password',
570       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
571     ]
572   );
573
574   # Same, but with hashref as argument
575   # See parse_connect_info for explanation
576   ->connect_info(
577     [{
578       dsn         => 'dbi:Pg:dbname=foo',
579       user        => 'postgres',
580       password    => 'my_pg_password',
581       AutoCommit  => 1,
582       quote_char  => q{"},
583       name_sep    => q{.},
584     }]
585   );
586
587   # Subref + DBIx::Class-specific connection options
588   ->connect_info(
589     [
590       sub { DBI->connect(...) },
591       {
592           quote_char => q{`},
593           name_sep => q{@},
594           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
595           disable_sth_caching => 1,
596       },
597     ]
598   );
599
600
601
602 =cut
603
604 sub connect_info {
605   my ($self, $info) = @_;
606
607   return $self->_connect_info if !$info;
608
609   $self->_connect_info($info); # copy for _connect_info
610
611   $info = $self->_normalize_connect_info($info)
612     if ref $info eq 'ARRAY';
613
614   my %attrs = (
615     %{ $self->_default_dbi_connect_attributes || {} },
616     %{ $info->{attributes} || {} },
617   );
618
619   my @args = @{ $info->{arguments} };
620
621   if (keys %attrs and ref $args[0] ne 'CODE') {
622     carp_unique (
623         'You provided explicit AutoCommit => 0 in your connection_info. '
624       . 'This is almost universally a bad idea (see the footnotes of '
625       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
626       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
627       . 'this warning.'
628     ) if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
629
630     push @args, \%attrs if keys %attrs;
631   }
632
633   # this is the authoritative "always an arrayref" thing fed to DBI->connect
634   # OR a single-element coderef-based $dbh factory
635   $self->_dbi_connect_info(\@args);
636
637   # extract the individual storage options
638   for my $storage_opt (keys %{ $info->{storage_options} }) {
639     my $value = $info->{storage_options}{$storage_opt};
640
641     $self->$storage_opt($value);
642   }
643
644   # Extract the individual sqlmaker options
645   #
646   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
647   #  the new set of options
648   $self->_sql_maker(undef);
649   $self->_sql_maker_opts({});
650
651   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
652     my $value = $info->{sql_maker_options}{$sql_maker_opt};
653
654     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
655   }
656
657   # FIXME - dirty:
658   # save attributes in a separate accessor so they are always
659   # introspectable, even in case of a CODE $dbhmaker
660   $self->_dbic_connect_attributes (\%attrs);
661
662   return $self->_connect_info;
663 }
664
665 sub _dbi_connect_info {
666   my $self = shift;
667
668   return $self->{_dbi_connect_info} = $_[0]
669     if @_;
670
671   my $conninfo = $self->{_dbi_connect_info} || [];
672
673   # last ditch effort to grab a DSN
674   if ( ! defined $conninfo->[0] and $ENV{DBI_DSN} ) {
675     my @new_conninfo = @$conninfo;
676     $new_conninfo[0] = $ENV{DBI_DSN};
677     $conninfo = \@new_conninfo;
678   }
679
680   return $conninfo;
681 }
682
683
684 sub _normalize_connect_info {
685   my ($self, $info_arg) = @_;
686   my %info;
687
688   my @args = @$info_arg;  # take a shallow copy for further mutilation
689
690   # combine/pre-parse arguments depending on invocation style
691
692   my %attrs;
693   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
694     %attrs = %{ $args[1] || {} };
695     @args = $args[0];
696   }
697   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
698     %attrs = %{$args[0]};
699     @args = ();
700     if (my $code = delete $attrs{dbh_maker}) {
701       @args = $code;
702
703       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
704       if (@ignored) {
705         carp sprintf (
706             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
707           . "to the result of 'dbh_maker'",
708
709           join (', ', map { "'$_'" } (@ignored) ),
710         );
711       }
712     }
713     else {
714       @args = delete @attrs{qw/dsn user password/};
715     }
716   }
717   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
718     %attrs = (
719       % { $args[3] || {} },
720       % { $args[4] || {} },
721     );
722     @args = @args[0,1,2];
723   }
724
725   $info{arguments} = \@args;
726
727   my @storage_opts = grep exists $attrs{$_},
728     @storage_options, 'cursor_class';
729
730   @{ $info{storage_options} }{@storage_opts} =
731     delete @attrs{@storage_opts} if @storage_opts;
732
733   my @sql_maker_opts = grep exists $attrs{$_},
734     qw/limit_dialect quote_char name_sep quote_names/;
735
736   @{ $info{sql_maker_options} }{@sql_maker_opts} =
737     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
738
739   $info{attributes} = \%attrs if %attrs;
740
741   return \%info;
742 }
743
744 sub _default_dbi_connect_attributes () {
745   +{
746     AutoCommit => 1,
747     PrintError => 0,
748     RaiseError => 1,
749     ShowErrorStatement => 1,
750   };
751 }
752
753 =head2 on_connect_do
754
755 This method is deprecated in favour of setting via L</connect_info>.
756
757 =cut
758
759 =head2 on_disconnect_do
760
761 This method is deprecated in favour of setting via L</connect_info>.
762
763 =cut
764
765 sub _parse_connect_do {
766   my ($self, $type) = @_;
767
768   my $val = $self->$type;
769   return () if not defined $val;
770
771   my @res;
772
773   if (not ref($val)) {
774     push @res, [ 'do_sql', $val ];
775   } elsif (ref($val) eq 'CODE') {
776     push @res, $val;
777   } elsif (ref($val) eq 'ARRAY') {
778     push @res, map { [ 'do_sql', $_ ] } @$val;
779   } else {
780     $self->throw_exception("Invalid type for $type: ".ref($val));
781   }
782
783   return \@res;
784 }
785
786 =head2 dbh_do
787
788 Arguments: ($subref | $method_name), @extra_coderef_args?
789
790 Execute the given $subref or $method_name using the new exception-based
791 connection management.
792
793 The first two arguments will be the storage object that C<dbh_do> was called
794 on and a database handle to use.  Any additional arguments will be passed
795 verbatim to the called subref as arguments 2 and onwards.
796
797 Using this (instead of $self->_dbh or $self->dbh) ensures correct
798 exception handling and reconnection (or failover in future subclasses).
799
800 Your subref should have no side-effects outside of the database, as
801 there is the potential for your subref to be partially double-executed
802 if the database connection was stale/dysfunctional.
803
804 Example:
805
806   my @stuff = $schema->storage->dbh_do(
807     sub {
808       my ($storage, $dbh, @cols) = @_;
809       my $cols = join(q{, }, @cols);
810       $dbh->selectrow_array("SELECT $cols FROM foo");
811     },
812     @column_list
813   );
814
815 =cut
816
817 sub dbh_do {
818   my $self = shift;
819   my $run_target = shift;
820
821   # short circuit when we know there is no need for a runner
822   #
823   # FIXME - assumption may be wrong
824   # the rationale for the txn_depth check is that if this block is a part
825   # of a larger transaction, everything up to that point is screwed anyway
826   return $self->$run_target($self->_get_dbh, @_)
827     if $self->{_in_do_block} or $self->transaction_depth;
828
829   # take a ref instead of a copy, to preserve @_ aliasing
830   # semantics within the coderef, but only if needed
831   # (pseudoforking doesn't like this trick much)
832   my $args = @_ ? \@_ : [];
833
834   DBIx::Class::Storage::BlockRunner->new(
835     storage => $self,
836     run_code => sub { $self->$run_target ($self->_get_dbh, @$args ) },
837     wrap_txn => 0,
838     retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) },
839   )->run;
840 }
841
842 sub txn_do {
843   $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
844   shift->next::method(@_);
845 }
846
847 =head2 disconnect
848
849 Our C<disconnect> method also performs a rollback first if the
850 database is not in C<AutoCommit> mode.
851
852 =cut
853
854 sub disconnect {
855   my ($self) = @_;
856
857   if( $self->_dbh ) {
858     my @actions;
859
860     push @actions, ( $self->on_disconnect_call || () );
861     push @actions, $self->_parse_connect_do ('on_disconnect_do');
862
863     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
864
865     # stops the "implicit rollback on disconnect" warning
866     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
867
868     %{ $self->_dbh->{CachedKids} } = ();
869     $self->_dbh->disconnect;
870     $self->_dbh(undef);
871   }
872 }
873
874 =head2 with_deferred_fk_checks
875
876 =over 4
877
878 =item Arguments: C<$coderef>
879
880 =item Return Value: The return value of $coderef
881
882 =back
883
884 Storage specific method to run the code ref with FK checks deferred or
885 in MySQL's case disabled entirely.
886
887 =cut
888
889 # Storage subclasses should override this
890 sub with_deferred_fk_checks {
891   my ($self, $sub) = @_;
892   $sub->();
893 }
894
895 =head2 connected
896
897 =over
898
899 =item Arguments: none
900
901 =item Return Value: 1|0
902
903 =back
904
905 Verifies that the current database handle is active and ready to execute
906 an SQL statement (e.g. the connection did not get stale, server is still
907 answering, etc.) This method is used internally by L</dbh>.
908
909 =cut
910
911 sub connected {
912   my $self = shift;
913   return 0 unless $self->_seems_connected;
914
915   #be on the safe side
916   local $self->_dbh->{RaiseError} = 1;
917
918   return $self->_ping;
919 }
920
921 sub _seems_connected {
922   my $self = shift;
923
924   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
925
926   my $dbh = $self->_dbh
927     or return 0;
928
929   return $dbh->FETCH('Active');
930 }
931
932 sub _ping {
933   my $self = shift;
934
935   my $dbh = $self->_dbh or return 0;
936
937   return $dbh->ping;
938 }
939
940 sub ensure_connected {
941   my ($self) = @_;
942
943   unless ($self->connected) {
944     $self->_populate_dbh;
945   }
946 }
947
948 =head2 dbh
949
950 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
951 is guaranteed to be healthy by implicitly calling L</connected>, and if
952 necessary performing a reconnection before returning. Keep in mind that this
953 is very B<expensive> on some database engines. Consider using L</dbh_do>
954 instead.
955
956 =cut
957
958 sub dbh {
959   my ($self) = @_;
960
961   if (not $self->_dbh) {
962     $self->_populate_dbh;
963   } else {
964     $self->ensure_connected;
965   }
966   return $self->_dbh;
967 }
968
969 # this is the internal "get dbh or connect (don't check)" method
970 sub _get_dbh {
971   my $self = shift;
972   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
973   $self->_populate_dbh unless $self->_dbh;
974   return $self->_dbh;
975 }
976
977 sub sql_maker {
978   my ($self) = @_;
979   unless ($self->_sql_maker) {
980     my $sql_maker_class = $self->sql_maker_class;
981
982     my %opts = %{$self->_sql_maker_opts||{}};
983     my $dialect =
984       $opts{limit_dialect}
985         ||
986       $self->sql_limit_dialect
987         ||
988       do {
989         my $s_class = (ref $self) || $self;
990         carp_unique (
991           "Your storage class ($s_class) does not set sql_limit_dialect and you "
992         . 'have not supplied an explicit limit_dialect in your connection_info. '
993         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
994         . 'databases but can be (and often is) painfully slow. '
995         . "Please file an RT ticket against '$s_class'"
996         ) if $self->_dbi_connect_info->[0];
997
998         'GenericSubQ';
999       }
1000     ;
1001
1002     my ($quote_char, $name_sep);
1003
1004     if ($opts{quote_names}) {
1005       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1006         my $s_class = (ref $self) || $self;
1007         carp_unique (
1008           "You requested 'quote_names' but your storage class ($s_class) does "
1009         . 'not explicitly define a default sql_quote_char and you have not '
1010         . 'supplied a quote_char as part of your connection_info. DBIC will '
1011         .q{default to the ANSI SQL standard quote '"', which works most of }
1012         . "the time. Please file an RT ticket against '$s_class'."
1013         );
1014
1015         '"'; # RV
1016       };
1017
1018       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1019     }
1020
1021     $self->_sql_maker($sql_maker_class->new(
1022       bindtype=>'columns',
1023       array_datatypes => 1,
1024       limit_dialect => $dialect,
1025       ($quote_char ? (quote_char => $quote_char) : ()),
1026       name_sep => ($name_sep || '.'),
1027       %opts,
1028     ));
1029   }
1030   return $self->_sql_maker;
1031 }
1032
1033 # nothing to do by default
1034 sub _rebless {}
1035 sub _init {}
1036
1037 sub _populate_dbh {
1038   my ($self) = @_;
1039
1040   $self->_dbh(undef); # in case ->connected failed we might get sent here
1041   $self->_dbh_details({}); # reset everything we know
1042
1043   $self->_dbh($self->_connect);
1044
1045   $self->_conn_pid($$) unless DBIx::Class::_ENV_::BROKEN_FORK; # on win32 these are in fact threads
1046
1047   $self->_determine_driver;
1048
1049   # Always set the transaction depth on connect, since
1050   #  there is no transaction in progress by definition
1051   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1052
1053   $self->_run_connection_actions unless $self->{_in_determine_driver};
1054 }
1055
1056 sub _run_connection_actions {
1057   my $self = shift;
1058   my @actions;
1059
1060   push @actions, ( $self->on_connect_call || () );
1061   push @actions, $self->_parse_connect_do ('on_connect_do');
1062
1063   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1064 }
1065
1066
1067
1068 sub set_use_dbms_capability {
1069   $_[0]->set_inherited ($_[1], $_[2]);
1070 }
1071
1072 sub get_use_dbms_capability {
1073   my ($self, $capname) = @_;
1074
1075   my $use = $self->get_inherited ($capname);
1076   return defined $use
1077     ? $use
1078     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1079   ;
1080 }
1081
1082 sub set_dbms_capability {
1083   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1084 }
1085
1086 sub get_dbms_capability {
1087   my ($self, $capname) = @_;
1088
1089   my $cap = $self->_dbh_details->{capability}{$capname};
1090
1091   unless (defined $cap) {
1092     if (my $meth = $self->can ("_determine$capname")) {
1093       $cap = $self->$meth ? 1 : 0;
1094     }
1095     else {
1096       $cap = 0;
1097     }
1098
1099     $self->set_dbms_capability ($capname, $cap);
1100   }
1101
1102   return $cap;
1103 }
1104
1105 sub _server_info {
1106   my $self = shift;
1107
1108   my $info;
1109   unless ($info = $self->_dbh_details->{info}) {
1110
1111     $info = {};
1112
1113     my $server_version = try {
1114       $self->_get_server_version
1115     } catch {
1116       # driver determination *may* use this codepath
1117       # in which case we must rethrow
1118       $self->throw_exception($_) if $self->{_in_determine_driver};
1119
1120       # $server_version on failure
1121       undef;
1122     };
1123
1124     if (defined $server_version) {
1125       $info->{dbms_version} = $server_version;
1126
1127       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1128       my @verparts = split (/\./, $numeric_version);
1129       if (
1130         @verparts
1131           &&
1132         $verparts[0] <= 999
1133       ) {
1134         # consider only up to 3 version parts, iff not more than 3 digits
1135         my @use_parts;
1136         while (@verparts && @use_parts < 3) {
1137           my $p = shift @verparts;
1138           last if $p > 999;
1139           push @use_parts, $p;
1140         }
1141         push @use_parts, 0 while @use_parts < 3;
1142
1143         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1144       }
1145     }
1146
1147     $self->_dbh_details->{info} = $info;
1148   }
1149
1150   return $info;
1151 }
1152
1153 sub _get_server_version {
1154   shift->_dbh_get_info('SQL_DBMS_VER');
1155 }
1156
1157 sub _dbh_get_info {
1158   my ($self, $info) = @_;
1159
1160   if ($info =~ /[^0-9]/) {
1161     require DBI::Const::GetInfoType;
1162     $info = $DBI::Const::GetInfoType::GetInfoType{$info};
1163     $self->throw_exception("Info type '$_[1]' not provided by DBI::Const::GetInfoType")
1164       unless defined $info;
1165   }
1166
1167   $self->_get_dbh->get_info($info);
1168 }
1169
1170 sub _describe_connection {
1171   require DBI::Const::GetInfoReturn;
1172
1173   my $self = shift;
1174
1175   my $drv;
1176   try {
1177     $drv = $self->_extract_driver_from_connect_info;
1178     $self->ensure_connected;
1179   };
1180
1181   $drv = "DBD::$drv" if $drv;
1182
1183   my $res = {
1184     DBIC_DSN => $self->_dbi_connect_info->[0],
1185     DBI_VER => DBI->VERSION,
1186     DBIC_VER => DBIx::Class->VERSION,
1187     DBIC_DRIVER => ref $self,
1188     $drv ? (
1189       DBD => $drv,
1190       DBD_VER => try { $drv->VERSION },
1191     ) : (),
1192   };
1193
1194   # try to grab data even if we never managed to connect
1195   # will cover us in cases of an oddly broken half-connect
1196   for my $inf (
1197     #keys %DBI::Const::GetInfoType::GetInfoType,
1198     qw/
1199       SQL_CURSOR_COMMIT_BEHAVIOR
1200       SQL_CURSOR_ROLLBACK_BEHAVIOR
1201       SQL_CURSOR_SENSITIVITY
1202       SQL_DATA_SOURCE_NAME
1203       SQL_DBMS_NAME
1204       SQL_DBMS_VER
1205       SQL_DEFAULT_TXN_ISOLATION
1206       SQL_DM_VER
1207       SQL_DRIVER_NAME
1208       SQL_DRIVER_ODBC_VER
1209       SQL_DRIVER_VER
1210       SQL_EXPRESSIONS_IN_ORDERBY
1211       SQL_GROUP_BY
1212       SQL_IDENTIFIER_CASE
1213       SQL_IDENTIFIER_QUOTE_CHAR
1214       SQL_MAX_CATALOG_NAME_LEN
1215       SQL_MAX_COLUMN_NAME_LEN
1216       SQL_MAX_IDENTIFIER_LEN
1217       SQL_MAX_TABLE_NAME_LEN
1218       SQL_MULTIPLE_ACTIVE_TXN
1219       SQL_MULT_RESULT_SETS
1220       SQL_NEED_LONG_DATA_LEN
1221       SQL_NON_NULLABLE_COLUMNS
1222       SQL_ODBC_VER
1223       SQL_QUALIFIER_NAME_SEPARATOR
1224       SQL_QUOTED_IDENTIFIER_CASE
1225       SQL_TXN_CAPABLE
1226       SQL_TXN_ISOLATION_OPTION
1227     /
1228   ) {
1229     # some drivers barf on things they do not know about instead
1230     # of returning undef
1231     my $v = try { $self->_dbh_get_info($inf) };
1232     next unless defined $v;
1233
1234     #my $key = sprintf( '%s(%s)', $inf, $DBI::Const::GetInfoType::GetInfoType{$inf} );
1235     my $expl = DBI::Const::GetInfoReturn::Explain($inf, $v);
1236     $res->{$inf} = DBI::Const::GetInfoReturn::Format($inf, $v) . ( $expl ? " ($expl)" : '' );
1237   }
1238
1239   $res;
1240 }
1241
1242 sub _determine_driver {
1243   my ($self) = @_;
1244
1245   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1246     my $started_connected = 0;
1247     local $self->{_in_determine_driver} = 1;
1248
1249     if (ref($self) eq __PACKAGE__) {
1250       my $driver;
1251       if ($self->_dbh) { # we are connected
1252         $driver = $self->_dbh->{Driver}{Name};
1253         $started_connected = 1;
1254       }
1255       else {
1256         $driver = $self->_extract_driver_from_connect_info;
1257       }
1258
1259       if ($driver) {
1260         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1261         if ($self->load_optional_class($storage_class)) {
1262           mro::set_mro($storage_class, 'c3');
1263           bless $self, $storage_class;
1264           $self->_rebless();
1265         }
1266         else {
1267           $self->_warn_undetermined_driver(
1268             'This version of DBIC does not yet seem to supply a driver for '
1269           . "your particular RDBMS and/or connection method ('$driver')."
1270           );
1271         }
1272       }
1273       else {
1274         $self->_warn_undetermined_driver(
1275           'Unable to extract a driver name from connect info - this '
1276         . 'should not have happened.'
1277         );
1278       }
1279     }
1280
1281     $self->_driver_determined(1);
1282
1283     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1284
1285     if ($self->can('source_bind_attributes')) {
1286       $self->throw_exception(
1287         "Your storage subclass @{[ ref $self ]} provides (or inherits) the method "
1288       . 'source_bind_attributes() for which support has been removed as of Jan 2013. '
1289       . 'If you are not sure how to proceed please contact the development team via '
1290       . 'http://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT'
1291       );
1292     }
1293
1294     $self->_init; # run driver-specific initializations
1295
1296     $self->_run_connection_actions
1297         if !$started_connected && defined $self->_dbh;
1298   }
1299 }
1300
1301 sub _extract_driver_from_connect_info {
1302   my $self = shift;
1303
1304   my $drv;
1305
1306   # if connect_info is a CODEREF, we have no choice but to connect
1307   if (
1308     ref $self->_dbi_connect_info->[0]
1309       and
1310     reftype $self->_dbi_connect_info->[0] eq 'CODE'
1311   ) {
1312     $self->_populate_dbh;
1313     $drv = $self->_dbh->{Driver}{Name};
1314   }
1315   else {
1316     # try to use dsn to not require being connected, the driver may still
1317     # force a connection later in _rebless to determine version
1318     # (dsn may not be supplied at all if all we do is make a mock-schema)
1319     ($drv) = ($self->_dbi_connect_info->[0] || '') =~ /^dbi:([^:]+):/i;
1320     $drv ||= $ENV{DBI_DRIVER};
1321   }
1322
1323   return $drv;
1324 }
1325
1326 sub _determine_connector_driver {
1327   my ($self, $conn) = @_;
1328
1329   my $dbtype = $self->_dbh_get_info('SQL_DBMS_NAME');
1330
1331   if (not $dbtype) {
1332     $self->_warn_undetermined_driver(
1333       'Unable to retrieve RDBMS type (SQL_DBMS_NAME) of the engine behind your '
1334     . "$conn connector - this should not have happened."
1335     );
1336     return;
1337   }
1338
1339   $dbtype =~ s/\W/_/gi;
1340
1341   my $subclass = "DBIx::Class::Storage::DBI::${conn}::${dbtype}";
1342   return if $self->isa($subclass);
1343
1344   if ($self->load_optional_class($subclass)) {
1345     bless $self, $subclass;
1346     $self->_rebless;
1347   }
1348   else {
1349     $self->_warn_undetermined_driver(
1350       'This version of DBIC does not yet seem to supply a driver for '
1351     . "your particular RDBMS and/or connection method ('$conn/$dbtype')."
1352     );
1353   }
1354 }
1355
1356 sub _warn_undetermined_driver {
1357   my ($self, $msg) = @_;
1358
1359   require Data::Dumper::Concise;
1360
1361   carp_once ($msg . ' While we will attempt to continue anyway, the results '
1362   . 'are likely to be underwhelming. Please upgrade DBIC, and if this message '
1363   . "does not go away, file a bugreport including the following info:\n"
1364   . Data::Dumper::Concise::Dumper($self->_describe_connection)
1365   );
1366 }
1367
1368 sub _do_connection_actions {
1369   my $self          = shift;
1370   my $method_prefix = shift;
1371   my $call          = shift;
1372
1373   if (not ref($call)) {
1374     my $method = $method_prefix . $call;
1375     $self->$method(@_);
1376   } elsif (ref($call) eq 'CODE') {
1377     $self->$call(@_);
1378   } elsif (ref($call) eq 'ARRAY') {
1379     if (ref($call->[0]) ne 'ARRAY') {
1380       $self->_do_connection_actions($method_prefix, $_) for @$call;
1381     } else {
1382       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1383     }
1384   } else {
1385     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1386   }
1387
1388   return $self;
1389 }
1390
1391 sub connect_call_do_sql {
1392   my $self = shift;
1393   $self->_do_query(@_);
1394 }
1395
1396 sub disconnect_call_do_sql {
1397   my $self = shift;
1398   $self->_do_query(@_);
1399 }
1400
1401 # override in db-specific backend when necessary
1402 sub connect_call_datetime_setup { 1 }
1403
1404 sub _do_query {
1405   my ($self, $action) = @_;
1406
1407   if (ref $action eq 'CODE') {
1408     $action = $action->($self);
1409     $self->_do_query($_) foreach @$action;
1410   }
1411   else {
1412     # Most debuggers expect ($sql, @bind), so we need to exclude
1413     # the attribute hash which is the second argument to $dbh->do
1414     # furthermore the bind values are usually to be presented
1415     # as named arrayref pairs, so wrap those here too
1416     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1417     my $sql = shift @do_args;
1418     my $attrs = shift @do_args;
1419     my @bind = map { [ undef, $_ ] } @do_args;
1420
1421     $self->dbh_do(sub {
1422       $_[0]->_query_start($sql, \@bind);
1423       $_[1]->do($sql, $attrs, @do_args);
1424       $_[0]->_query_end($sql, \@bind);
1425     });
1426   }
1427
1428   return $self;
1429 }
1430
1431 sub _connect {
1432   my $self = shift;
1433
1434   my $info = $self->_dbi_connect_info;
1435
1436   $self->throw_exception("You did not provide any connection_info")
1437     unless defined $info->[0];
1438
1439   my ($old_connect_via, $dbh);
1440
1441   local $DBI::connect_via = 'connect' if $INC{'Apache/DBI.pm'} && $ENV{MOD_PERL};
1442
1443   # this odd anonymous coderef dereference is in fact really
1444   # necessary to avoid the unwanted effect described in perl5
1445   # RT#75792
1446   #
1447   # in addition the coderef itself can't reside inside the try{} block below
1448   # as it somehow triggers a leak under perl -d
1449   my $dbh_error_handler_installer = sub {
1450     weaken (my $weak_self = $_[0]);
1451
1452     # the coderef is blessed so we can distinguish it from externally
1453     # supplied handles (which must be preserved)
1454     $_[1]->{HandleError} = bless sub {
1455       if ($weak_self) {
1456         $weak_self->throw_exception("DBI Exception: $_[0]");
1457       }
1458       else {
1459         # the handler may be invoked by something totally out of
1460         # the scope of DBIC
1461         DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1462       }
1463     }, '__DBIC__DBH__ERROR__HANDLER__';
1464   };
1465
1466   try {
1467     if(ref $info->[0] eq 'CODE') {
1468       $dbh = $info->[0]->();
1469     }
1470     else {
1471       require DBI;
1472       $dbh = DBI->connect(@$info);
1473     }
1474
1475     die $DBI::errstr unless $dbh;
1476
1477     die sprintf ("%s fresh DBI handle with a *false* 'Active' attribute. "
1478       . 'This handle is disconnected as far as DBIC is concerned, and we can '
1479       . 'not continue',
1480       ref $info->[0] eq 'CODE'
1481         ? "Connection coderef $info->[0] returned a"
1482         : 'DBI->connect($schema->storage->connect_info) resulted in a'
1483     ) unless $dbh->FETCH('Active');
1484
1485     # sanity checks unless asked otherwise
1486     unless ($self->unsafe) {
1487
1488       $self->throw_exception(
1489         'Refusing clobbering of {HandleError} installed on externally supplied '
1490        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1491       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1492
1493       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1494       # request, or an external handle. Complain and set anyway
1495       unless ($dbh->{RaiseError}) {
1496         carp( ref $info->[0] eq 'CODE'
1497
1498           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1499            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1500            .'attribute has been supplied'
1501
1502           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1503            .'unsafe => 1. Toggling RaiseError back to true'
1504         );
1505
1506         $dbh->{RaiseError} = 1;
1507       }
1508
1509       $dbh_error_handler_installer->($self, $dbh);
1510     }
1511   }
1512   catch {
1513     $self->throw_exception("DBI Connection failed: $_")
1514   };
1515
1516   $self->_dbh_autocommit($dbh->{AutoCommit});
1517   return $dbh;
1518 }
1519
1520 sub txn_begin {
1521   my $self = shift;
1522
1523   # this means we have not yet connected and do not know the AC status
1524   # (e.g. coderef $dbh), need a full-fledged connection check
1525   if (! defined $self->_dbh_autocommit) {
1526     $self->ensure_connected;
1527   }
1528   # Otherwise simply connect or re-connect on pid changes
1529   else {
1530     $self->_get_dbh;
1531   }
1532
1533   $self->next::method(@_);
1534 }
1535
1536 sub _exec_txn_begin {
1537   my $self = shift;
1538
1539   # if the user is utilizing txn_do - good for him, otherwise we need to
1540   # ensure that the $dbh is healthy on BEGIN.
1541   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1542   # will be replaced by a failure of begin_work itself (which will be
1543   # then retried on reconnect)
1544   if ($self->{_in_do_block}) {
1545     $self->_dbh->begin_work;
1546   } else {
1547     $self->dbh_do(sub { $_[1]->begin_work });
1548   }
1549 }
1550
1551 sub txn_commit {
1552   my $self = shift;
1553
1554   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1555   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1556     unless $self->_dbh;
1557
1558   # esoteric case for folks using external $dbh handles
1559   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1560     carp "Storage transaction_depth 0 does not match "
1561         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1562     $self->transaction_depth(1);
1563   }
1564
1565   $self->next::method(@_);
1566
1567   # if AutoCommit is disabled txn_depth never goes to 0
1568   # as a new txn is started immediately on commit
1569   $self->transaction_depth(1) if (
1570     !$self->transaction_depth
1571       and
1572     defined $self->_dbh_autocommit
1573       and
1574     ! $self->_dbh_autocommit
1575   );
1576 }
1577
1578 sub _exec_txn_commit {
1579   shift->_dbh->commit;
1580 }
1581
1582 sub txn_rollback {
1583   my $self = shift;
1584
1585   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1586   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1587     unless $self->_dbh;
1588
1589   # esoteric case for folks using external $dbh handles
1590   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1591     carp "Storage transaction_depth 0 does not match "
1592         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1593     $self->transaction_depth(1);
1594   }
1595
1596   $self->next::method(@_);
1597
1598   # if AutoCommit is disabled txn_depth never goes to 0
1599   # as a new txn is started immediately on commit
1600   $self->transaction_depth(1) if (
1601     !$self->transaction_depth
1602       and
1603     defined $self->_dbh_autocommit
1604       and
1605     ! $self->_dbh_autocommit
1606   );
1607 }
1608
1609 sub _exec_txn_rollback {
1610   shift->_dbh->rollback;
1611 }
1612
1613 # generate some identical methods
1614 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1615   no strict qw/refs/;
1616   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1617     my $self = shift;
1618     $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1619     $self->throw_exception("Unable to $meth() on a disconnected storage")
1620       unless $self->_dbh;
1621     $self->next::method(@_);
1622   };
1623 }
1624
1625 # This used to be the top-half of _execute.  It was split out to make it
1626 #  easier to override in NoBindVars without duping the rest.  It takes up
1627 #  all of _execute's args, and emits $sql, @bind.
1628 sub _prep_for_execute {
1629   #my ($self, $op, $ident, $args) = @_;
1630   return shift->_gen_sql_bind(@_)
1631 }
1632
1633 sub _gen_sql_bind {
1634   my ($self, $op, $ident, $args) = @_;
1635
1636   my ($colinfos, $from);
1637   if ( blessed($ident) ) {
1638     $from = $ident->from;
1639     $colinfos = $ident->columns_info;
1640   }
1641
1642   my ($sql, $bind);
1643   ($sql, @$bind) = $self->sql_maker->$op( ($from || $ident), @$args );
1644
1645   $bind = $self->_resolve_bindattrs(
1646     $ident, [ @{$args->[2]{bind}||[]}, @$bind ], $colinfos
1647   );
1648
1649   if (
1650     ! $ENV{DBIC_DT_SEARCH_OK}
1651       and
1652     $op eq 'select'
1653       and
1654     first {
1655       length ref $_->[1]
1656         and
1657       blessed($_->[1])
1658         and
1659       $_->[1]->isa('DateTime')
1660     } @$bind
1661   ) {
1662     carp_unique 'DateTime objects passed to search() are not supported '
1663       . 'properly (InflateColumn::DateTime formats and settings are not '
1664       . 'respected.) See "Formatting DateTime objects in queries" in '
1665       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1666       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1667   }
1668
1669   return( $sql, $bind );
1670 }
1671
1672 sub _resolve_bindattrs {
1673   my ($self, $ident, $bind, $colinfos) = @_;
1674
1675   $colinfos ||= {};
1676
1677   my $resolve_bindinfo = sub {
1678     #my $infohash = shift;
1679
1680     %$colinfos = %{ $self->_resolve_column_info($ident) }
1681       unless keys %$colinfos;
1682
1683     my $ret;
1684     if (my $col = $_[0]->{dbic_colname}) {
1685       $ret = { %{$_[0]} };
1686
1687       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1688         if $colinfos->{$col}{data_type};
1689
1690       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1691         if $colinfos->{$col}{size};
1692     }
1693
1694     $ret || $_[0];
1695   };
1696
1697   return [ map {
1698     my $resolved =
1699       ( ref $_ ne 'ARRAY' or @$_ != 2 ) ? [ {}, $_ ]
1700     : ( ! defined $_->[0] )             ? [ {}, $_->[1] ]
1701     : (ref $_->[0] eq 'HASH')           ? [ (exists $_->[0]{dbd_attrs} or $_->[0]{sqlt_datatype})
1702                                               ? $_->[0]
1703                                               : $resolve_bindinfo->($_->[0])
1704                                             , $_->[1] ]
1705     : (ref $_->[0] eq 'SCALAR')         ? [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1706     :                                     [ $resolve_bindinfo->(
1707                                               { dbic_colname => $_->[0] }
1708                                             ), $_->[1] ]
1709     ;
1710
1711     if (
1712       ! exists $resolved->[0]{dbd_attrs}
1713         and
1714       ! $resolved->[0]{sqlt_datatype}
1715         and
1716       length ref $resolved->[1]
1717         and
1718       ! overload::Method($resolved->[1], '""')
1719     ) {
1720       require Data::Dumper;
1721       local $Data::Dumper::Maxdepth = 1;
1722       local $Data::Dumper::Terse = 1;
1723       local $Data::Dumper::Useqq = 1;
1724       local $Data::Dumper::Indent = 0;
1725       local $Data::Dumper::Pad = ' ';
1726       $self->throw_exception(
1727         'You must supply a datatype/bindtype (see DBIx::Class::ResultSet/DBIC BIND VALUES) '
1728       . 'for non-scalar value '. Data::Dumper::Dumper ($resolved->[1])
1729       );
1730     }
1731
1732     $resolved;
1733
1734   } @$bind ];
1735 }
1736
1737 sub _format_for_trace {
1738   #my ($self, $bind) = @_;
1739
1740   ### Turn @bind from something like this:
1741   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1742   ### to this:
1743   ###   ( "'1'", "'3'" )
1744
1745   map {
1746     defined( $_ && $_->[1] )
1747       ? qq{'$_->[1]'}
1748       : q{NULL}
1749   } @{$_[1] || []};
1750 }
1751
1752 sub _query_start {
1753   my ( $self, $sql, $bind ) = @_;
1754
1755   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1756     if $self->debug;
1757 }
1758
1759 sub _query_end {
1760   my ( $self, $sql, $bind ) = @_;
1761
1762   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1763     if $self->debug;
1764 }
1765
1766 sub _dbi_attrs_for_bind {
1767   my ($self, $ident, $bind) = @_;
1768
1769   my @attrs;
1770
1771   for (map { $_->[0] } @$bind) {
1772     push @attrs, do {
1773       if (exists $_->{dbd_attrs}) {
1774         $_->{dbd_attrs}
1775       }
1776       elsif($_->{sqlt_datatype}) {
1777         # cache the result in the dbh_details hash, as it can not change unless
1778         # we connect to something else
1779         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1780         if (not exists $cache->{$_->{sqlt_datatype}}) {
1781           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1782         }
1783         $cache->{$_->{sqlt_datatype}};
1784       }
1785       else {
1786         undef;  # always push something at this position
1787       }
1788     }
1789   }
1790
1791   return \@attrs;
1792 }
1793
1794 sub _execute {
1795   my ($self, $op, $ident, @args) = @_;
1796
1797   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1798
1799   # not even a PID check - we do not care about the state of the _dbh.
1800   # All we need is to get the appropriate drivers loaded if they aren't
1801   # already so that the assumption in ad7c50fc26e holds
1802   $self->_populate_dbh unless $self->_dbh;
1803
1804   $self->dbh_do( _dbh_execute =>     # retry over disconnects
1805     $sql,
1806     $bind,
1807     $self->_dbi_attrs_for_bind($ident, $bind),
1808   );
1809 }
1810
1811 sub _dbh_execute {
1812   my ($self, $dbh, $sql, $bind, $bind_attrs) = @_;
1813
1814   $self->_query_start( $sql, $bind );
1815
1816   my $sth = $self->_bind_sth_params(
1817     $self->_prepare_sth($dbh, $sql),
1818     $bind,
1819     $bind_attrs,
1820   );
1821
1822   # Can this fail without throwing an exception anyways???
1823   my $rv = $sth->execute();
1824   $self->throw_exception(
1825     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1826   ) if !$rv;
1827
1828   $self->_query_end( $sql, $bind );
1829
1830   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1831 }
1832
1833 sub _prepare_sth {
1834   my ($self, $dbh, $sql) = @_;
1835
1836   # 3 is the if_active parameter which avoids active sth re-use
1837   my $sth = $self->disable_sth_caching
1838     ? $dbh->prepare($sql)
1839     : $dbh->prepare_cached($sql, {}, 3);
1840
1841   # XXX You would think RaiseError would make this impossible,
1842   #  but apparently that's not true :(
1843   $self->throw_exception(
1844     $dbh->errstr
1845       ||
1846     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
1847             .'an exception and/or setting $dbh->errstr',
1848       length ($sql) > 20
1849         ? substr($sql, 0, 20) . '...'
1850         : $sql
1851       ,
1852       'DBD::' . $dbh->{Driver}{Name},
1853     )
1854   ) if !$sth;
1855
1856   $sth;
1857 }
1858
1859 sub _bind_sth_params {
1860   my ($self, $sth, $bind, $bind_attrs) = @_;
1861
1862   for my $i (0 .. $#$bind) {
1863     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1864       $sth->bind_param_inout(
1865         $i + 1, # bind params counts are 1-based
1866         $bind->[$i][1],
1867         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1868         $bind_attrs->[$i],
1869       );
1870     }
1871     else {
1872       # FIXME SUBOPTIMAL - most likely this is not necessary at all
1873       # confirm with dbi-dev whether explicit stringification is needed
1874       my $v = ( length ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""') )
1875         ? "$bind->[$i][1]"
1876         : $bind->[$i][1]
1877       ;
1878       $sth->bind_param(
1879         $i + 1,
1880         $v,
1881         $bind_attrs->[$i],
1882       );
1883     }
1884   }
1885
1886   $sth;
1887 }
1888
1889 sub _prefetch_autovalues {
1890   my ($self, $source, $colinfo, $to_insert) = @_;
1891
1892   my %values;
1893   for my $col (keys %$colinfo) {
1894     if (
1895       $colinfo->{$col}{auto_nextval}
1896         and
1897       (
1898         ! exists $to_insert->{$col}
1899           or
1900         ref $to_insert->{$col} eq 'SCALAR'
1901           or
1902         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1903       )
1904     ) {
1905       $values{$col} = $self->_sequence_fetch(
1906         'NEXTVAL',
1907         ( $colinfo->{$col}{sequence} ||=
1908             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1909         ),
1910       );
1911     }
1912   }
1913
1914   \%values;
1915 }
1916
1917 sub insert {
1918   my ($self, $source, $to_insert) = @_;
1919
1920   my $col_infos = $source->columns_info;
1921
1922   my $prefetched_values = $self->_prefetch_autovalues($source, $col_infos, $to_insert);
1923
1924   # fuse the values, but keep a separate list of prefetched_values so that
1925   # they can be fused once again with the final return
1926   $to_insert = { %$to_insert, %$prefetched_values };
1927
1928   # FIXME - we seem to assume undef values as non-supplied. This is wrong.
1929   # Investigate what does it take to s/defined/exists/
1930   my %pcols = map { $_ => 1 } $source->primary_columns;
1931   my (%retrieve_cols, $autoinc_supplied, $retrieve_autoinc_col);
1932   for my $col ($source->columns) {
1933     if ($col_infos->{$col}{is_auto_increment}) {
1934       $autoinc_supplied ||= 1 if defined $to_insert->{$col};
1935       $retrieve_autoinc_col ||= $col unless $autoinc_supplied;
1936     }
1937
1938     # nothing to retrieve when explicit values are supplied
1939     next if (defined $to_insert->{$col} and ! (
1940       ref $to_insert->{$col} eq 'SCALAR'
1941         or
1942       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1943     ));
1944
1945     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1946     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1947       $pcols{$col}
1948         or
1949       $col_infos->{$col}{retrieve_on_insert}
1950     );
1951   };
1952
1953   local $self->{_autoinc_supplied_for_op} = $autoinc_supplied;
1954   local $self->{_perform_autoinc_retrieval} = $retrieve_autoinc_col;
1955
1956   my ($sqla_opts, @ir_container);
1957   if (%retrieve_cols and $self->_use_insert_returning) {
1958     $sqla_opts->{returning_container} = \@ir_container
1959       if $self->_use_insert_returning_bound;
1960
1961     $sqla_opts->{returning} = [
1962       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1963     ];
1964   }
1965
1966   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1967
1968   my %returned_cols = %$to_insert;
1969   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1970     @ir_container = try {
1971       local $SIG{__WARN__} = sub {};
1972       my @r = $sth->fetchrow_array;
1973       $sth->finish;
1974       @r;
1975     } unless @ir_container;
1976
1977     @returned_cols{@$retlist} = @ir_container if @ir_container;
1978   }
1979   else {
1980     # pull in PK if needed and then everything else
1981     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1982
1983       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1984         unless $self->can('last_insert_id');
1985
1986       my @pri_values = $self->last_insert_id($source, @missing_pri);
1987
1988       $self->throw_exception( "Can't get last insert id" )
1989         unless (@pri_values == @missing_pri);
1990
1991       @returned_cols{@missing_pri} = @pri_values;
1992       delete @retrieve_cols{@missing_pri};
1993     }
1994
1995     # if there is more left to pull
1996     if (%retrieve_cols) {
1997       $self->throw_exception(
1998         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1999       ) unless %pcols;
2000
2001       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
2002
2003       my $cur = DBIx::Class::ResultSet->new($source, {
2004         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
2005         select => \@left_to_fetch,
2006       })->cursor;
2007
2008       @returned_cols{@left_to_fetch} = $cur->next;
2009
2010       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
2011         if scalar $cur->next;
2012     }
2013   }
2014
2015   return { %$prefetched_values, %returned_cols };
2016 }
2017
2018 sub insert_bulk {
2019   my ($self, $source, $cols, $data) = @_;
2020
2021   my @col_range = (0..$#$cols);
2022
2023   # FIXME SUBOPTIMAL - most likely this is not necessary at all
2024   # confirm with dbi-dev whether explicit stringification is needed
2025   #
2026   # forcibly stringify whatever is stringifiable
2027   # ResultSet::populate() hands us a copy - safe to mangle
2028   for my $r (0 .. $#$data) {
2029     for my $c (0 .. $#{$data->[$r]}) {
2030       $data->[$r][$c] = "$data->[$r][$c]"
2031         if ( length ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
2032     }
2033   }
2034
2035   my $colinfos = $source->columns_info($cols);
2036
2037   local $self->{_autoinc_supplied_for_op} =
2038     (first { $_->{is_auto_increment} } values %$colinfos)
2039       ? 1
2040       : 0
2041   ;
2042
2043   # get a slice type index based on first row of data
2044   # a "column" in this context may refer to more than one bind value
2045   # e.g. \[ '?, ?', [...], [...] ]
2046   #
2047   # construct the value type index - a description of values types for every
2048   # per-column slice of $data:
2049   #
2050   # nonexistent - nonbind literal
2051   # 0 - regular value
2052   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
2053   #
2054   # also construct the column hash to pass to the SQL generator. For plain
2055   # (non literal) values - convert the members of the first row into a
2056   # literal+bind combo, with extra positional info in the bind attr hashref.
2057   # This will allow us to match the order properly, and is so contrived
2058   # because a user-supplied literal/bind (or something else specific to a
2059   # resultsource and/or storage driver) can inject extra binds along the
2060   # way, so one can't rely on "shift positions" ordering at all. Also we
2061   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
2062   # can be later matched up by address), because we want to supply a real
2063   # value on which perhaps e.g. datatype checks will be performed
2064   my ($proto_data, $value_type_by_col_idx);
2065   for my $i (@col_range) {
2066     my $colname = $cols->[$i];
2067     if (ref $data->[0][$i] eq 'SCALAR') {
2068       # no bind value at all - no type
2069
2070       $proto_data->{$colname} = $data->[0][$i];
2071     }
2072     elsif (ref $data->[0][$i] eq 'REF' and ref ${$data->[0][$i]} eq 'ARRAY' ) {
2073       # repack, so we don't end up mangling the original \[]
2074       my ($sql, @bind) = @${$data->[0][$i]};
2075
2076       # normalization of user supplied stuff
2077       my $resolved_bind = $self->_resolve_bindattrs(
2078         $source, \@bind, $colinfos,
2079       );
2080
2081       # store value-less (attrs only) bind info - we will be comparing all
2082       # supplied binds against this for sanity
2083       $value_type_by_col_idx->{$i} = [ map { $_->[0] } @$resolved_bind ];
2084
2085       $proto_data->{$colname} = \[ $sql, map { [
2086         # inject slice order to use for $proto_bind construction
2087           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $i, _literal_bind_subindex => $_+1 }
2088             =>
2089           $resolved_bind->[$_][1]
2090         ] } (0 .. $#bind)
2091       ];
2092     }
2093     else {
2094       $value_type_by_col_idx->{$i} = undef;
2095
2096       $proto_data->{$colname} = \[ '?', [
2097         { dbic_colname => $colname, _bind_data_slice_idx => $i }
2098           =>
2099         $data->[0][$i]
2100       ] ];
2101     }
2102   }
2103
2104   my ($sql, $proto_bind) = $self->_prep_for_execute (
2105     'insert',
2106     $source,
2107     [ $proto_data ],
2108   );
2109
2110   if (! @$proto_bind and keys %$value_type_by_col_idx) {
2111     # if the bindlist is empty and we had some dynamic binds, this means the
2112     # storage ate them away (e.g. the NoBindVars component) and interpolated
2113     # them directly into the SQL. This obviously can't be good for multi-inserts
2114     $self->throw_exception('Cannot insert_bulk without support for placeholders');
2115   }
2116
2117   # sanity checks
2118   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
2119   #
2120   # use an error reporting closure for convenience (less to pass)
2121   my $bad_slice_report_cref = sub {
2122     my ($msg, $r_idx, $c_idx) = @_;
2123     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
2124       $msg,
2125       $cols->[$c_idx],
2126       do {
2127         require Data::Dumper::Concise;
2128         local $Data::Dumper::Maxdepth = 5;
2129         Data::Dumper::Concise::Dumper ({
2130           map { $cols->[$_] =>
2131             $data->[$r_idx][$_]
2132           } @col_range
2133         }),
2134       }
2135     );
2136   };
2137
2138   for my $col_idx (@col_range) {
2139     my $reference_val = $data->[0][$col_idx];
2140
2141     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
2142       my $val = $data->[$row_idx][$col_idx];
2143
2144       if (! exists $value_type_by_col_idx->{$col_idx}) { # literal no binds
2145         if (ref $val ne 'SCALAR') {
2146           $bad_slice_report_cref->(
2147             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
2148             $row_idx,
2149             $col_idx,
2150           );
2151         }
2152         elsif ($$val ne $$reference_val) {
2153           $bad_slice_report_cref->(
2154             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
2155             $row_idx,
2156             $col_idx,
2157           );
2158         }
2159       }
2160       elsif (! defined $value_type_by_col_idx->{$col_idx} ) {  # regular non-literal value
2161         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
2162           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
2163         }
2164       }
2165       else {  # binds from a \[], compare type and attrs
2166         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
2167           $bad_slice_report_cref->(
2168             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
2169             $row_idx,
2170             $col_idx,
2171           );
2172         }
2173         # start drilling down and bail out early on identical refs
2174         elsif (
2175           $reference_val != $val
2176             or
2177           $$reference_val != $$val
2178         ) {
2179           if (${$val}->[0] ne ${$reference_val}->[0]) {
2180             $bad_slice_report_cref->(
2181               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
2182               $row_idx,
2183               $col_idx,
2184             );
2185           }
2186           # need to check the bind attrs - a bind will happen only once for
2187           # the entire dataset, so any changes further down will be ignored.
2188           elsif (! Data::Compare::Compare(
2189             $value_type_by_col_idx->{$col_idx},
2190             [
2191               map
2192               { $_->[0] }
2193               @{$self->_resolve_bindattrs(
2194                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
2195               )}
2196             ],
2197           )) {
2198             $bad_slice_report_cref->(
2199               'Differing bind attributes on literal/bind values not supported',
2200               $row_idx,
2201               $col_idx,
2202             );
2203           }
2204         }
2205       }
2206     }
2207   }
2208
2209   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
2210   # are atomic (even if execute_for_fetch is a single call). Thus a safety
2211   # scope guard
2212   my $guard = $self->txn_scope_guard;
2213
2214   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
2215   my $sth = $self->_prepare_sth($self->_dbh, $sql);
2216   my $rv = do {
2217     if (@$proto_bind) {
2218       # proto bind contains the information on which pieces of $data to pull
2219       # $cols is passed in only for prettier error-reporting
2220       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
2221     }
2222     else {
2223       # bind_param_array doesn't work if there are no binds
2224       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
2225     }
2226   };
2227
2228   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
2229
2230   $guard->commit;
2231
2232   return wantarray ? ($rv, $sth, @$proto_bind) : $rv;
2233 }
2234
2235 # execute_for_fetch is capable of returning data just fine (it means it
2236 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
2237 # is the void-populate fast-path we will just ignore this altogether
2238 # for the time being.
2239 sub _dbh_execute_for_fetch {
2240   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
2241
2242   my @idx_range = ( 0 .. $#$proto_bind );
2243
2244   # If we have any bind attributes to take care of, we will bind the
2245   # proto-bind data (which will never be used by execute_for_fetch)
2246   # However since column bindtypes are "sticky", this is sufficient
2247   # to get the DBD to apply the bindtype to all values later on
2248
2249   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
2250
2251   for my $i (@idx_range) {
2252     $sth->bind_param (
2253       $i+1, # DBI bind indexes are 1-based
2254       $proto_bind->[$i][1],
2255       $bind_attrs->[$i],
2256     ) if defined $bind_attrs->[$i];
2257   }
2258
2259   # At this point $data slots named in the _bind_data_slice_idx of
2260   # each piece of $proto_bind are either \[]s or plain values to be
2261   # passed in. Construct the dispensing coderef. *NOTE* the order
2262   # of $data will differ from this of the ?s in the SQL (due to
2263   # alphabetical ordering by colname). We actually do want to
2264   # preserve this behavior so that prepare_cached has a better
2265   # chance of matching on unrelated calls
2266
2267   my $fetch_row_idx = -1; # saner loop this way
2268   my $fetch_tuple = sub {
2269     return undef if ++$fetch_row_idx > $#$data;
2270
2271     return [ map { defined $_->{_literal_bind_subindex}
2272       ? ${ $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]}
2273          ->[ $_->{_literal_bind_subindex} ]
2274           ->[1]
2275       : $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]
2276     } map { $_->[0] } @$proto_bind];
2277   };
2278
2279   my $tuple_status = [];
2280   my ($rv, $err);
2281   try {
2282     $rv = $sth->execute_for_fetch(
2283       $fetch_tuple,
2284       $tuple_status,
2285     );
2286   }
2287   catch {
2288     $err = shift;
2289   };
2290
2291   # Not all DBDs are create equal. Some throw on error, some return
2292   # an undef $rv, and some set $sth->err - try whatever we can
2293   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2294     ! defined $err
2295       and
2296     ( !defined $rv or $sth->err )
2297   );
2298
2299   # Statement must finish even if there was an exception.
2300   try {
2301     $sth->finish
2302   }
2303   catch {
2304     $err = shift unless defined $err
2305   };
2306
2307   if (defined $err) {
2308     my $i = 0;
2309     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2310
2311     $self->throw_exception("Unexpected populate error: $err")
2312       if ($i > $#$tuple_status);
2313
2314     require Data::Dumper::Concise;
2315     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2316       ($tuple_status->[$i][1] || $err),
2317       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2318     );
2319   }
2320
2321   return $rv;
2322 }
2323
2324 sub _dbh_execute_inserts_with_no_binds {
2325   my ($self, $sth, $count) = @_;
2326
2327   my $err;
2328   try {
2329     my $dbh = $self->_get_dbh;
2330     local $dbh->{RaiseError} = 1;
2331     local $dbh->{PrintError} = 0;
2332
2333     $sth->execute foreach 1..$count;
2334   }
2335   catch {
2336     $err = shift;
2337   };
2338
2339   # Make sure statement is finished even if there was an exception.
2340   try {
2341     $sth->finish
2342   }
2343   catch {
2344     $err = shift unless defined $err;
2345   };
2346
2347   $self->throw_exception($err) if defined $err;
2348
2349   return $count;
2350 }
2351
2352 sub update {
2353   #my ($self, $source, @args) = @_;
2354   shift->_execute('update', @_);
2355 }
2356
2357
2358 sub delete {
2359   #my ($self, $source, @args) = @_;
2360   shift->_execute('delete', @_);
2361 }
2362
2363 sub _select {
2364   my $self = shift;
2365   $self->_execute($self->_select_args(@_));
2366 }
2367
2368 sub _select_args_to_query {
2369   my $self = shift;
2370
2371   $self->throw_exception(
2372     "Unable to generate limited query representation with 'software_limit' enabled"
2373   ) if ($_[3]->{software_limit} and ($_[3]->{offset} or $_[3]->{rows}) );
2374
2375   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2376   #  = $self->_select_args($ident, $select, $cond, $attrs);
2377   my ($op, $ident, @args) =
2378     $self->_select_args(@_);
2379
2380   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2381   my ($sql, $bind) = $self->_gen_sql_bind($op, $ident, \@args);
2382
2383   # reuse the bind arrayref
2384   unshift @{$bind}, "($sql)";
2385   \$bind;
2386 }
2387
2388 sub _select_args {
2389   my ($self, $ident, $select, $where, $orig_attrs) = @_;
2390
2391   # FIXME - that kind of caching would be nice to have
2392   # however currently we *may* pass the same $orig_attrs
2393   # with different ident/select/where
2394   # the whole interface needs to be rethought, since it
2395   # was centered around the flawed SQLA API. We can do
2396   # soooooo much better now. But that is also another
2397   # battle...
2398   #return (
2399   #  'select', @{$orig_attrs->{_sqlmaker_select_args}}
2400   #) if $orig_attrs->{_sqlmaker_select_args};
2401
2402   my $sql_maker = $self->sql_maker;
2403   my $alias2source = $self->_resolve_ident_sources ($ident);
2404
2405   my $attrs = {
2406     %$orig_attrs,
2407     select => $select,
2408     from => $ident,
2409     where => $where,
2410
2411     # limit dialects use this stuff
2412     # yes, some CDBICompat crap does not supply an {alias} >.<
2413     ( $orig_attrs->{alias} and $alias2source->{$orig_attrs->{alias}} )
2414       ? ( _rsroot_rsrc => $alias2source->{$orig_attrs->{alias}} )
2415       : ()
2416     ,
2417   };
2418
2419   # Sanity check the attributes (SQLMaker does it too, but
2420   # in case of a software_limit we'll never reach there)
2421   if (defined $attrs->{offset}) {
2422     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2423       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2424   }
2425
2426   if (defined $attrs->{rows}) {
2427     $self->throw_exception("The rows attribute must be a positive integer if present")
2428       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2429   }
2430   elsif ($attrs->{offset}) {
2431     # MySQL actually recommends this approach.  I cringe.
2432     $attrs->{rows} = $sql_maker->__max_int;
2433   }
2434
2435   # see if we will need to tear the prefetch apart to satisfy group_by == select
2436   # this is *extremely tricky* to get right, I am still not sure I did
2437   #
2438   my ($prefetch_needs_subquery, @limit_args);
2439
2440   if ( $attrs->{_grouped_by_distinct} and $attrs->{collapse} ) {
2441     # we already know there is a valid group_by (we made it) and we know it is
2442     # intended to be based *only* on non-multi stuff
2443     # short circuit the group_by parsing below
2444     $prefetch_needs_subquery = 1;
2445   }
2446   elsif (
2447     # The rationale is that even if we do *not* have collapse, we still
2448     # need to wrap the core grouped select/group_by in a subquery
2449     # so that databases that care about group_by/select equivalence
2450     # are happy (this includes MySQL in strict_mode)
2451     # If any of the other joined tables are referenced in the group_by
2452     # however - the user is on their own
2453     ( $prefetch_needs_subquery or $attrs->{_related_results_construction} )
2454       and
2455     $attrs->{group_by}
2456       and
2457     @{$attrs->{group_by}}
2458       and
2459     my $grp_aliases = try { # try{} because $attrs->{from} may be unreadable
2460       $self->_resolve_aliastypes_from_select_args({ from => $attrs->{from}, group_by => $attrs->{group_by} })
2461     }
2462   ) {
2463     # no aliases other than our own in group_by
2464     # if there are - do not allow subquery even if limit is present
2465     $prefetch_needs_subquery = ! scalar grep { $_ ne $attrs->{alias} } keys %{ $grp_aliases->{grouping} || {} };
2466   }
2467   elsif ( $attrs->{rows} && $attrs->{collapse} ) {
2468     # active collapse with a limit - that one is a no-brainer unless
2469     # overruled by a group_by above
2470     $prefetch_needs_subquery = 1;
2471   }
2472
2473   if ($prefetch_needs_subquery) {
2474     $attrs = $self->_adjust_select_args_for_complex_prefetch ($attrs);
2475   }
2476   elsif (! $attrs->{software_limit} ) {
2477     push @limit_args, (
2478       $attrs->{rows} || (),
2479       $attrs->{offset} || (),
2480     );
2481   }
2482
2483   # try to simplify the joinmap further (prune unreferenced type-single joins)
2484   if (
2485     ! $prefetch_needs_subquery  # already pruned
2486       and
2487     ref $attrs->{from}
2488       and
2489     reftype $attrs->{from} eq 'ARRAY'
2490       and
2491     @{$attrs->{from}} != 1
2492   ) {
2493     ($attrs->{from}, $attrs->{_aliastypes}) = $self->_prune_unused_joins ($attrs);
2494   }
2495
2496 ###
2497   # This would be the point to deflate anything found in $attrs->{where}
2498   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2499   # expect a result object. And all we have is a resultsource (it is trivial
2500   # to extract deflator coderefs via $alias2source above).
2501   #
2502   # I don't see a way forward other than changing the way deflators are
2503   # invoked, and that's just bad...
2504 ###
2505
2506   return ( 'select', @{ $orig_attrs->{_sqlmaker_select_args} = [
2507     @{$attrs}{qw(from select where)}, $attrs, @limit_args
2508   ]} );
2509 }
2510
2511 # Returns a counting SELECT for a simple count
2512 # query. Abstracted so that a storage could override
2513 # this to { count => 'firstcol' } or whatever makes
2514 # sense as a performance optimization
2515 sub _count_select {
2516   #my ($self, $source, $rs_attrs) = @_;
2517   return { count => '*' };
2518 }
2519
2520 =head2 select
2521
2522 =over 4
2523
2524 =item Arguments: $ident, $select, $condition, $attrs
2525
2526 =back
2527
2528 Handle a SQL select statement.
2529
2530 =cut
2531
2532 sub select {
2533   my $self = shift;
2534   my ($ident, $select, $condition, $attrs) = @_;
2535   return $self->cursor_class->new($self, \@_, $attrs);
2536 }
2537
2538 sub select_single {
2539   my $self = shift;
2540   my ($rv, $sth, @bind) = $self->_select(@_);
2541   my @row = $sth->fetchrow_array;
2542   my @nextrow = $sth->fetchrow_array if @row;
2543   if(@row && @nextrow) {
2544     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2545   }
2546   # Need to call finish() to work round broken DBDs
2547   $sth->finish();
2548   return @row;
2549 }
2550
2551 =head2 sql_limit_dialect
2552
2553 This is an accessor for the default SQL limit dialect used by a particular
2554 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2555 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2556 see L<DBIx::Class::SQLMaker::LimitDialects>.
2557
2558 =cut
2559
2560 sub _dbh_columns_info_for {
2561   my ($self, $dbh, $table) = @_;
2562
2563   if ($dbh->can('column_info')) {
2564     my %result;
2565     my $caught;
2566     try {
2567       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2568       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2569       $sth->execute();
2570       while ( my $info = $sth->fetchrow_hashref() ){
2571         my %column_info;
2572         $column_info{data_type}   = $info->{TYPE_NAME};
2573         $column_info{size}      = $info->{COLUMN_SIZE};
2574         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2575         $column_info{default_value} = $info->{COLUMN_DEF};
2576         my $col_name = $info->{COLUMN_NAME};
2577         $col_name =~ s/^\"(.*)\"$/$1/;
2578
2579         $result{$col_name} = \%column_info;
2580       }
2581     } catch {
2582       $caught = 1;
2583     };
2584     return \%result if !$caught && scalar keys %result;
2585   }
2586
2587   my %result;
2588   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2589   $sth->execute;
2590   my @columns = @{$sth->{NAME_lc}};
2591   for my $i ( 0 .. $#columns ){
2592     my %column_info;
2593     $column_info{data_type} = $sth->{TYPE}->[$i];
2594     $column_info{size} = $sth->{PRECISION}->[$i];
2595     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2596
2597     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2598       $column_info{data_type} = $1;
2599       $column_info{size}    = $2;
2600     }
2601
2602     $result{$columns[$i]} = \%column_info;
2603   }
2604   $sth->finish;
2605
2606   foreach my $col (keys %result) {
2607     my $colinfo = $result{$col};
2608     my $type_num = $colinfo->{data_type};
2609     my $type_name;
2610     if(defined $type_num && $dbh->can('type_info')) {
2611       my $type_info = $dbh->type_info($type_num);
2612       $type_name = $type_info->{TYPE_NAME} if $type_info;
2613       $colinfo->{data_type} = $type_name if $type_name;
2614     }
2615   }
2616
2617   return \%result;
2618 }
2619
2620 sub columns_info_for {
2621   my ($self, $table) = @_;
2622   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2623 }
2624
2625 =head2 last_insert_id
2626
2627 Return the row id of the last insert.
2628
2629 =cut
2630
2631 sub _dbh_last_insert_id {
2632     my ($self, $dbh, $source, $col) = @_;
2633
2634     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2635
2636     return $id if defined $id;
2637
2638     my $class = ref $self;
2639     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2640 }
2641
2642 sub last_insert_id {
2643   my $self = shift;
2644   $self->_dbh_last_insert_id ($self->_dbh, @_);
2645 }
2646
2647 =head2 _native_data_type
2648
2649 =over 4
2650
2651 =item Arguments: $type_name
2652
2653 =back
2654
2655 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2656 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2657 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2658
2659 The default implementation returns C<undef>, implement in your Storage driver if
2660 you need this functionality.
2661
2662 Should map types from other databases to the native RDBMS type, for example
2663 C<VARCHAR2> to C<VARCHAR>.
2664
2665 Types with modifiers should map to the underlying data type. For example,
2666 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2667
2668 Composite types should map to the container type, for example
2669 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2670
2671 =cut
2672
2673 sub _native_data_type {
2674   #my ($self, $data_type) = @_;
2675   return undef
2676 }
2677
2678 # Check if placeholders are supported at all
2679 sub _determine_supports_placeholders {
2680   my $self = shift;
2681   my $dbh  = $self->_get_dbh;
2682
2683   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2684   # but it is inaccurate more often than not
2685   return try {
2686     local $dbh->{PrintError} = 0;
2687     local $dbh->{RaiseError} = 1;
2688     $dbh->do('select ?', {}, 1);
2689     1;
2690   }
2691   catch {
2692     0;
2693   };
2694 }
2695
2696 # Check if placeholders bound to non-string types throw exceptions
2697 #
2698 sub _determine_supports_typeless_placeholders {
2699   my $self = shift;
2700   my $dbh  = $self->_get_dbh;
2701
2702   return try {
2703     local $dbh->{PrintError} = 0;
2704     local $dbh->{RaiseError} = 1;
2705     # this specifically tests a bind that is NOT a string
2706     $dbh->do('select 1 where 1 = ?', {}, 1);
2707     1;
2708   }
2709   catch {
2710     0;
2711   };
2712 }
2713
2714 =head2 sqlt_type
2715
2716 Returns the database driver name.
2717
2718 =cut
2719
2720 sub sqlt_type {
2721   shift->_get_dbh->{Driver}->{Name};
2722 }
2723
2724 =head2 bind_attribute_by_data_type
2725
2726 Given a datatype from column info, returns a database specific bind
2727 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2728 let the database planner just handle it.
2729
2730 This method is always called after the driver has been determined and a DBI
2731 connection has been established. Therefore you can refer to C<DBI::$constant>
2732 and/or C<DBD::$driver::$constant> directly, without worrying about loading
2733 the correct modules.
2734
2735 =cut
2736
2737 sub bind_attribute_by_data_type {
2738     return;
2739 }
2740
2741 =head2 is_datatype_numeric
2742
2743 Given a datatype from column_info, returns a boolean value indicating if
2744 the current RDBMS considers it a numeric value. This controls how
2745 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2746 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2747 be performed instead of the usual C<eq>.
2748
2749 =cut
2750
2751 sub is_datatype_numeric {
2752   #my ($self, $dt) = @_;
2753
2754   return 0 unless $_[1];
2755
2756   $_[1] =~ /^ (?:
2757     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2758   ) $/ix;
2759 }
2760
2761
2762 =head2 create_ddl_dir
2763
2764 =over 4
2765
2766 =item Arguments: $schema, \@databases, $version, $directory, $preversion, \%sqlt_args
2767
2768 =back
2769
2770 Creates a SQL file based on the Schema, for each of the specified
2771 database engines in C<\@databases> in the given directory.
2772 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2773
2774 Given a previous version number, this will also create a file containing
2775 the ALTER TABLE statements to transform the previous schema into the
2776 current one. Note that these statements may contain C<DROP TABLE> or
2777 C<DROP COLUMN> statements that can potentially destroy data.
2778
2779 The file names are created using the C<ddl_filename> method below, please
2780 override this method in your schema if you would like a different file
2781 name format. For the ALTER file, the same format is used, replacing
2782 $version in the name with "$preversion-$version".
2783
2784 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2785 The most common value for this would be C<< { add_drop_table => 1 } >>
2786 to have the SQL produced include a C<DROP TABLE> statement for each table
2787 created. For quoting purposes supply C<quote_identifiers>.
2788
2789 If no arguments are passed, then the following default values are assumed:
2790
2791 =over 4
2792
2793 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2794
2795 =item version    - $schema->schema_version
2796
2797 =item directory  - './'
2798
2799 =item preversion - <none>
2800
2801 =back
2802
2803 By default, C<\%sqlt_args> will have
2804
2805  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2806
2807 merged with the hash passed in. To disable any of those features, pass in a
2808 hashref like the following
2809
2810  { ignore_constraint_names => 0, # ... other options }
2811
2812
2813 WARNING: You are strongly advised to check all SQL files created, before applying
2814 them.
2815
2816 =cut
2817
2818 sub create_ddl_dir {
2819   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2820
2821   unless ($dir) {
2822     carp "No directory given, using ./\n";
2823     $dir = './';
2824   } else {
2825       -d $dir
2826         or
2827       (require File::Path and File::Path::mkpath (["$dir"]))  # mkpath does not like objects (i.e. Path::Class::Dir)
2828         or
2829       $self->throw_exception(
2830         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2831       );
2832   }
2833
2834   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2835
2836   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2837   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2838
2839   my $schema_version = $schema->schema_version || '1.x';
2840   $version ||= $schema_version;
2841
2842   $sqltargs = {
2843     add_drop_table => 1,
2844     ignore_constraint_names => 1,
2845     ignore_index_names => 1,
2846     %{$sqltargs || {}}
2847   };
2848
2849   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2850     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2851   }
2852
2853   my $sqlt = SQL::Translator->new( $sqltargs );
2854
2855   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2856   my $sqlt_schema = $sqlt->translate({ data => $schema })
2857     or $self->throw_exception ($sqlt->error);
2858
2859   foreach my $db (@$databases) {
2860     $sqlt->reset();
2861     $sqlt->{schema} = $sqlt_schema;
2862     $sqlt->producer($db);
2863
2864     my $file;
2865     my $filename = $schema->ddl_filename($db, $version, $dir);
2866     if (-e $filename && ($version eq $schema_version )) {
2867       # if we are dumping the current version, overwrite the DDL
2868       carp "Overwriting existing DDL file - $filename";
2869       unlink($filename);
2870     }
2871
2872     my $output = $sqlt->translate;
2873     if(!$output) {
2874       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2875       next;
2876     }
2877     if(!open($file, ">$filename")) {
2878       $self->throw_exception("Can't open $filename for writing ($!)");
2879       next;
2880     }
2881     print $file $output;
2882     close($file);
2883
2884     next unless ($preversion);
2885
2886     require SQL::Translator::Diff;
2887
2888     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2889     if(!-e $prefilename) {
2890       carp("No previous schema file found ($prefilename)");
2891       next;
2892     }
2893
2894     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2895     if(-e $difffile) {
2896       carp("Overwriting existing diff file - $difffile");
2897       unlink($difffile);
2898     }
2899
2900     my $source_schema;
2901     {
2902       my $t = SQL::Translator->new($sqltargs);
2903       $t->debug( 0 );
2904       $t->trace( 0 );
2905
2906       $t->parser( $db )
2907         or $self->throw_exception ($t->error);
2908
2909       my $out = $t->translate( $prefilename )
2910         or $self->throw_exception ($t->error);
2911
2912       $source_schema = $t->schema;
2913
2914       $source_schema->name( $prefilename )
2915         unless ( $source_schema->name );
2916     }
2917
2918     # The "new" style of producers have sane normalization and can support
2919     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2920     # And we have to diff parsed SQL against parsed SQL.
2921     my $dest_schema = $sqlt_schema;
2922
2923     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2924       my $t = SQL::Translator->new($sqltargs);
2925       $t->debug( 0 );
2926       $t->trace( 0 );
2927
2928       $t->parser( $db )
2929         or $self->throw_exception ($t->error);
2930
2931       my $out = $t->translate( $filename )
2932         or $self->throw_exception ($t->error);
2933
2934       $dest_schema = $t->schema;
2935
2936       $dest_schema->name( $filename )
2937         unless $dest_schema->name;
2938     }
2939
2940     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2941                                                   $dest_schema,   $db,
2942                                                   $sqltargs
2943                                                  );
2944     if(!open $file, ">$difffile") {
2945       $self->throw_exception("Can't write to $difffile ($!)");
2946       next;
2947     }
2948     print $file $diff;
2949     close($file);
2950   }
2951 }
2952
2953 =head2 deployment_statements
2954
2955 =over 4
2956
2957 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2958
2959 =back
2960
2961 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2962
2963 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2964 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2965
2966 C<$directory> is used to return statements from files in a previously created
2967 L</create_ddl_dir> directory and is optional. The filenames are constructed
2968 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2969
2970 If no C<$directory> is specified then the statements are constructed on the
2971 fly using L<SQL::Translator> and C<$version> is ignored.
2972
2973 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2974
2975 =cut
2976
2977 sub deployment_statements {
2978   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2979   $type ||= $self->sqlt_type;
2980   $version ||= $schema->schema_version || '1.x';
2981   $dir ||= './';
2982   my $filename = $schema->ddl_filename($type, $version, $dir);
2983   if(-f $filename)
2984   {
2985       # FIXME replace this block when a proper sane sql parser is available
2986       my $file;
2987       open($file, "<$filename")
2988         or $self->throw_exception("Can't open $filename ($!)");
2989       my @rows = <$file>;
2990       close($file);
2991       return join('', @rows);
2992   }
2993
2994   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2995     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2996   }
2997
2998   # sources needs to be a parser arg, but for simplicity allow at top level
2999   # coming in
3000   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
3001       if exists $sqltargs->{sources};
3002
3003   my $tr = SQL::Translator->new(
3004     producer => "SQL::Translator::Producer::${type}",
3005     %$sqltargs,
3006     parser => 'SQL::Translator::Parser::DBIx::Class',
3007     data => $schema,
3008   );
3009
3010   return preserve_context {
3011     $tr->translate
3012   } after => sub {
3013     $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
3014       unless defined $_[0];
3015   };
3016 }
3017
3018 # FIXME deploy() currently does not accurately report sql errors
3019 # Will always return true while errors are warned
3020 sub deploy {
3021   my ($self, $schema, $type, $sqltargs, $dir) = @_;
3022   my $deploy = sub {
3023     my $line = shift;
3024     return if(!$line);
3025     return if($line =~ /^--/);
3026     # next if($line =~ /^DROP/m);
3027     return if($line =~ /^BEGIN TRANSACTION/m);
3028     return if($line =~ /^COMMIT/m);
3029     return if $line =~ /^\s+$/; # skip whitespace only
3030     $self->_query_start($line);
3031     try {
3032       # do a dbh_do cycle here, as we need some error checking in
3033       # place (even though we will ignore errors)
3034       $self->dbh_do (sub { $_[1]->do($line) });
3035     } catch {
3036       carp qq{$_ (running "${line}")};
3037     };
3038     $self->_query_end($line);
3039   };
3040   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
3041   if (@statements > 1) {
3042     foreach my $statement (@statements) {
3043       $deploy->( $statement );
3044     }
3045   }
3046   elsif (@statements == 1) {
3047     # split on single line comments and end of statements
3048     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
3049       $deploy->( $line );
3050     }
3051   }
3052 }
3053
3054 =head2 datetime_parser
3055
3056 Returns the datetime parser class
3057
3058 =cut
3059
3060 sub datetime_parser {
3061   my $self = shift;
3062   return $self->{datetime_parser} ||= do {
3063     $self->build_datetime_parser(@_);
3064   };
3065 }
3066
3067 =head2 datetime_parser_type
3068
3069 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
3070
3071 =head2 build_datetime_parser
3072
3073 See L</datetime_parser>
3074
3075 =cut
3076
3077 sub build_datetime_parser {
3078   my $self = shift;
3079   my $type = $self->datetime_parser_type(@_);
3080   return $type;
3081 }
3082
3083
3084 =head2 is_replicating
3085
3086 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
3087 replicate from a master database.  Default is undef, which is the result
3088 returned by databases that don't support replication.
3089
3090 =cut
3091
3092 sub is_replicating {
3093     return;
3094
3095 }
3096
3097 =head2 lag_behind_master
3098
3099 Returns a number that represents a certain amount of lag behind a master db
3100 when a given storage is replicating.  The number is database dependent, but
3101 starts at zero and increases with the amount of lag. Default in undef
3102
3103 =cut
3104
3105 sub lag_behind_master {
3106     return;
3107 }
3108
3109 =head2 relname_to_table_alias
3110
3111 =over 4
3112
3113 =item Arguments: $relname, $join_count
3114
3115 =item Return Value: $alias
3116
3117 =back
3118
3119 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
3120 queries.
3121
3122 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
3123 way these aliases are named.
3124
3125 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
3126 otherwise C<"$relname">.
3127
3128 =cut
3129
3130 sub relname_to_table_alias {
3131   my ($self, $relname, $join_count) = @_;
3132
3133   my $alias = ($join_count && $join_count > 1 ?
3134     join('_', $relname, $join_count) : $relname);
3135
3136   return $alias;
3137 }
3138
3139 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
3140 # version and it may be necessary to amend or override it for a specific storage
3141 # if such binds are necessary.
3142 sub _max_column_bytesize {
3143   my ($self, $attr) = @_;
3144
3145   my $max_size;
3146
3147   if ($attr->{sqlt_datatype}) {
3148     my $data_type = lc($attr->{sqlt_datatype});
3149
3150     if ($attr->{sqlt_size}) {
3151
3152       # String/sized-binary types
3153       if ($data_type =~ /^(?:
3154           l? (?:var)? char(?:acter)? (?:\s*varying)?
3155             |
3156           (?:var)? binary (?:\s*varying)?
3157             |
3158           raw
3159         )\b/x
3160       ) {
3161         $max_size = $attr->{sqlt_size};
3162       }
3163       # Other charset/unicode types, assume scale of 4
3164       elsif ($data_type =~ /^(?:
3165           national \s* character (?:\s*varying)?
3166             |
3167           nchar
3168             |
3169           univarchar
3170             |
3171           nvarchar
3172         )\b/x
3173       ) {
3174         $max_size = $attr->{sqlt_size} * 4;
3175       }
3176     }
3177
3178     if (!$max_size and !$self->_is_lob_type($data_type)) {
3179       $max_size = 100 # for all other (numeric?) datatypes
3180     }
3181   }
3182
3183   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
3184 }
3185
3186 # Determine if a data_type is some type of BLOB
3187 sub _is_lob_type {
3188   my ($self, $data_type) = @_;
3189   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
3190     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
3191                                   |varchar|character\s*varying|nvarchar
3192                                   |national\s*character\s*varying))?\z/xi);
3193 }
3194
3195 sub _is_binary_lob_type {
3196   my ($self, $data_type) = @_;
3197   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
3198     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
3199 }
3200
3201 sub _is_text_lob_type {
3202   my ($self, $data_type) = @_;
3203   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
3204     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
3205                         |national\s*character\s*varying))\z/xi);
3206 }
3207
3208 # Determine if a data_type is some type of a binary type
3209 sub _is_binary_type {
3210   my ($self, $data_type) = @_;
3211   $data_type && ($self->_is_binary_lob_type($data_type)
3212     || $data_type =~ /(?:var)?(?:binary|bit|graphic)(?:\s*varying)?/i);
3213 }
3214
3215 1;
3216
3217 =head1 USAGE NOTES
3218
3219 =head2 DBIx::Class and AutoCommit
3220
3221 DBIx::Class can do some wonderful magic with handling exceptions,
3222 disconnections, and transactions when you use C<< AutoCommit => 1 >>
3223 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
3224 transaction support.
3225
3226 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
3227 in an assumed transaction between commits, and you're telling us you'd
3228 like to manage that manually.  A lot of the magic protections offered by
3229 this module will go away.  We can't protect you from exceptions due to database
3230 disconnects because we don't know anything about how to restart your
3231 transactions.  You're on your own for handling all sorts of exceptional
3232 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
3233 be with raw DBI.
3234
3235
3236 =head1 AUTHOR AND CONTRIBUTORS
3237
3238 See L<AUTHOR|DBIx::Class/AUTHOR> and L<CONTRIBUTORS|DBIx::Class/CONTRIBUTORS> in DBIx::Class
3239
3240 =head1 LICENSE
3241
3242 You may distribute this code under the same terms as Perl itself.
3243
3244 =cut