Stop various CLONE-registries from growing indefinitely
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use Scalar::Util qw/refaddr weaken reftype blessed/;
12 use List::Util qw/first/;
13 use Sub::Name 'subname';
14 use Context::Preserve 'preserve_context';
15 use Try::Tiny;
16 use overload ();
17 use Data::Compare (); # no imports!!! guard against insane architecture
18 use namespace::clean;
19
20 # default cursor class, overridable in connect_info attributes
21 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
22
23 __PACKAGE__->mk_group_accessors('inherited' => qw/
24   sql_limit_dialect sql_quote_char sql_name_sep
25 /);
26
27 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
28
29 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
30 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
31
32 __PACKAGE__->sql_name_sep('.');
33
34 __PACKAGE__->mk_group_accessors('simple' => qw/
35   _connect_info _dbic_connect_attributes _driver_determined
36   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
37   _perform_autoinc_retrieval _autoinc_supplied_for_op
38 /);
39
40 # the values for these accessors are picked out (and deleted) from
41 # the attribute hashref passed to connect_info
42 my @storage_options = qw/
43   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
44   disable_sth_caching unsafe auto_savepoint
45 /;
46 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
47
48
49 # capability definitions, using a 2-tiered accessor system
50 # The rationale is:
51 #
52 # A driver/user may define _use_X, which blindly without any checks says:
53 # "(do not) use this capability", (use_dbms_capability is an "inherited"
54 # type accessor)
55 #
56 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
57 # accessor, which in turn calls _determine_supports_X, and stores the return
58 # in a special slot on the storage object, which is wiped every time a $dbh
59 # reconnection takes place (it is not guaranteed that upon reconnection we
60 # will get the same rdbms version). _determine_supports_X does not need to
61 # exist on a driver, as we ->can for it before calling.
62
63 my @capabilities = (qw/
64   insert_returning
65   insert_returning_bound
66
67   multicolumn_in
68
69   placeholders
70   typeless_placeholders
71
72   join_optimizer
73 /);
74 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
75 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
76
77 # on by default, not strictly a capability (pending rewrite)
78 __PACKAGE__->_use_join_optimizer (1);
79 sub _determine_supports_join_optimizer { 1 };
80
81 # Each of these methods need _determine_driver called before itself
82 # in order to function reliably. This is a purely DRY optimization
83 #
84 # get_(use)_dbms_capability need to be called on the correct Storage
85 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
86 # _determine_supports_X which obv. needs a correct driver as well
87 my @rdbms_specific_methods = qw/
88   sqlt_type
89   deployment_statements
90
91   sql_maker
92   cursor_class
93
94   build_datetime_parser
95   datetime_parser_type
96
97   txn_begin
98
99   insert
100   insert_bulk
101   update
102   delete
103   select
104   select_single
105
106   with_deferred_fk_checks
107
108   get_use_dbms_capability
109   get_dbms_capability
110
111   _server_info
112   _get_server_version
113 /;
114
115 for my $meth (@rdbms_specific_methods) {
116
117   my $orig = __PACKAGE__->can ($meth)
118     or die "$meth is not a ::Storage::DBI method!";
119
120   no strict qw/refs/;
121   no warnings qw/redefine/;
122   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
123     if (
124       # only fire when invoked on an instance, a valid class-based invocation
125       # would e.g. be setting a default for an inherited accessor
126       ref $_[0]
127         and
128       ! $_[0]->{_driver_determined}
129         and
130       ! $_[0]->{_in_determine_driver}
131         and
132       # Only try to determine stuff if we have *something* that either is or can
133       # provide a DSN. Allows for bare $schema's generated with a plain ->connect()
134       # to still be marginally useful
135       $_[0]->_dbi_connect_info->[0]
136     ) {
137       $_[0]->_determine_driver;
138
139       # This for some reason crashes and burns on perl 5.8.1
140       # IFF the method ends up throwing an exception
141       #goto $_[0]->can ($meth);
142
143       my $cref = $_[0]->can ($meth);
144       goto $cref;
145     }
146
147     goto $orig;
148   };
149 }
150
151 =head1 NAME
152
153 DBIx::Class::Storage::DBI - DBI storage handler
154
155 =head1 SYNOPSIS
156
157   my $schema = MySchema->connect('dbi:SQLite:my.db');
158
159   $schema->storage->debug(1);
160
161   my @stuff = $schema->storage->dbh_do(
162     sub {
163       my ($storage, $dbh, @args) = @_;
164       $dbh->do("DROP TABLE authors");
165     },
166     @column_list
167   );
168
169   $schema->resultset('Book')->search({
170      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
171   });
172
173 =head1 DESCRIPTION
174
175 This class represents the connection to an RDBMS via L<DBI>.  See
176 L<DBIx::Class::Storage> for general information.  This pod only
177 documents DBI-specific methods and behaviors.
178
179 =head1 METHODS
180
181 =cut
182
183 sub new {
184   my $new = shift->next::method(@_);
185
186   $new->_sql_maker_opts({});
187   $new->_dbh_details({});
188   $new->{_in_do_block} = 0;
189
190   # read below to see what this does
191   $new->_arm_global_destructor;
192
193   $new;
194 }
195
196 # This is hack to work around perl shooting stuff in random
197 # order on exit(). If we do not walk the remaining storage
198 # objects in an END block, there is a *small but real* chance
199 # of a fork()ed child to kill the parent's shared DBI handle,
200 # *before perl reaches the DESTROY in this package*
201 # Yes, it is ugly and effective.
202 # Additionally this registry is used by the CLONE method to
203 # make sure no handles are shared between threads
204 {
205   my %seek_and_destroy;
206
207   sub _arm_global_destructor {
208
209     # quick "garbage collection" pass - prevents the registry
210     # from slowly growing with a bunch of undef-valued keys
211     defined $seek_and_destroy{$_} or delete $seek_and_destroy{$_}
212       for keys %seek_and_destroy;
213
214     weaken (
215       $seek_and_destroy{ refaddr($_[0]) } = $_[0]
216     );
217   }
218
219   END {
220     local $?; # just in case the DBI destructor changes it somehow
221
222     # destroy just the object if not native to this process
223     $_->_verify_pid for (grep
224       { defined $_ }
225       values %seek_and_destroy
226     );
227   }
228
229   sub CLONE {
230     # As per DBI's recommendation, DBIC disconnects all handles as
231     # soon as possible (DBIC will reconnect only on demand from within
232     # the thread)
233     my @instances = grep { defined $_ } values %seek_and_destroy;
234     %seek_and_destroy = ();
235
236     for (@instances) {
237       $_->_dbh(undef);
238
239       $_->transaction_depth(0);
240       $_->savepoints([]);
241
242       # properly renumber existing refs
243       $_->_arm_global_destructor
244     }
245   }
246 }
247
248 sub DESTROY {
249   my $self = shift;
250
251   # some databases spew warnings on implicit disconnect
252   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
253   local $SIG{__WARN__} = sub {};
254   $self->_dbh(undef);
255
256   # this op is necessary, since the very last perl runtime statement
257   # triggers a global destruction shootout, and the $SIG localization
258   # may very well be destroyed before perl actually gets to do the
259   # $dbh undef
260   1;
261 }
262
263 # handle pid changes correctly - do not destroy parent's connection
264 sub _verify_pid {
265   my $self = shift;
266
267   my $pid = $self->_conn_pid;
268   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
269     $dbh->{InactiveDestroy} = 1;
270     $self->_dbh(undef);
271     $self->transaction_depth(0);
272     $self->savepoints([]);
273   }
274
275   return;
276 }
277
278 =head2 connect_info
279
280 This method is normally called by L<DBIx::Class::Schema/connection>, which
281 encapsulates its argument list in an arrayref before passing them here.
282
283 The argument list may contain:
284
285 =over
286
287 =item *
288
289 The same 4-element argument set one would normally pass to
290 L<DBI/connect>, optionally followed by
291 L<extra attributes|/DBIx::Class specific connection attributes>
292 recognized by DBIx::Class:
293
294   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
295
296 =item *
297
298 A single code reference which returns a connected
299 L<DBI database handle|DBI/connect> optionally followed by
300 L<extra attributes|/DBIx::Class specific connection attributes> recognized
301 by DBIx::Class:
302
303   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
304
305 =item *
306
307 A single hashref with all the attributes and the dsn/user/password
308 mixed together:
309
310   $connect_info_args = [{
311     dsn => $dsn,
312     user => $user,
313     password => $pass,
314     %dbi_attributes,
315     %extra_attributes,
316   }];
317
318   $connect_info_args = [{
319     dbh_maker => sub { DBI->connect (...) },
320     %dbi_attributes,
321     %extra_attributes,
322   }];
323
324 This is particularly useful for L<Catalyst> based applications, allowing the
325 following config (L<Config::General> style):
326
327   <Model::DB>
328     schema_class   App::DB
329     <connect_info>
330       dsn          dbi:mysql:database=test
331       user         testuser
332       password     TestPass
333       AutoCommit   1
334     </connect_info>
335   </Model::DB>
336
337 The C<dsn>/C<user>/C<password> combination can be substituted by the
338 C<dbh_maker> key whose value is a coderef that returns a connected
339 L<DBI database handle|DBI/connect>
340
341 =back
342
343 Please note that the L<DBI> docs recommend that you always explicitly
344 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
345 recommends that it be set to I<1>, and that you perform transactions
346 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
347 to I<1> if you do not do explicitly set it to zero.  This is the default
348 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
349
350 =head3 DBIx::Class specific connection attributes
351
352 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
353 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
354 the following connection options. These options can be mixed in with your other
355 L<DBI> connection attributes, or placed in a separate hashref
356 (C<\%extra_attributes>) as shown above.
357
358 Every time C<connect_info> is invoked, any previous settings for
359 these options will be cleared before setting the new ones, regardless of
360 whether any options are specified in the new C<connect_info>.
361
362
363 =over
364
365 =item on_connect_do
366
367 Specifies things to do immediately after connecting or re-connecting to
368 the database.  Its value may contain:
369
370 =over
371
372 =item a scalar
373
374 This contains one SQL statement to execute.
375
376 =item an array reference
377
378 This contains SQL statements to execute in order.  Each element contains
379 a string or a code reference that returns a string.
380
381 =item a code reference
382
383 This contains some code to execute.  Unlike code references within an
384 array reference, its return value is ignored.
385
386 =back
387
388 =item on_disconnect_do
389
390 Takes arguments in the same form as L</on_connect_do> and executes them
391 immediately before disconnecting from the database.
392
393 Note, this only runs if you explicitly call L</disconnect> on the
394 storage object.
395
396 =item on_connect_call
397
398 A more generalized form of L</on_connect_do> that calls the specified
399 C<connect_call_METHOD> methods in your storage driver.
400
401   on_connect_do => 'select 1'
402
403 is equivalent to:
404
405   on_connect_call => [ [ do_sql => 'select 1' ] ]
406
407 Its values may contain:
408
409 =over
410
411 =item a scalar
412
413 Will call the C<connect_call_METHOD> method.
414
415 =item a code reference
416
417 Will execute C<< $code->($storage) >>
418
419 =item an array reference
420
421 Each value can be a method name or code reference.
422
423 =item an array of arrays
424
425 For each array, the first item is taken to be the C<connect_call_> method name
426 or code reference, and the rest are parameters to it.
427
428 =back
429
430 Some predefined storage methods you may use:
431
432 =over
433
434 =item do_sql
435
436 Executes a SQL string or a code reference that returns a SQL string. This is
437 what L</on_connect_do> and L</on_disconnect_do> use.
438
439 It can take:
440
441 =over
442
443 =item a scalar
444
445 Will execute the scalar as SQL.
446
447 =item an arrayref
448
449 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
450 attributes hashref and bind values.
451
452 =item a code reference
453
454 Will execute C<< $code->($storage) >> and execute the return array refs as
455 above.
456
457 =back
458
459 =item datetime_setup
460
461 Execute any statements necessary to initialize the database session to return
462 and accept datetime/timestamp values used with
463 L<DBIx::Class::InflateColumn::DateTime>.
464
465 Only necessary for some databases, see your specific storage driver for
466 implementation details.
467
468 =back
469
470 =item on_disconnect_call
471
472 Takes arguments in the same form as L</on_connect_call> and executes them
473 immediately before disconnecting from the database.
474
475 Calls the C<disconnect_call_METHOD> methods as opposed to the
476 C<connect_call_METHOD> methods called by L</on_connect_call>.
477
478 Note, this only runs if you explicitly call L</disconnect> on the
479 storage object.
480
481 =item disable_sth_caching
482
483 If set to a true value, this option will disable the caching of
484 statement handles via L<DBI/prepare_cached>.
485
486 =item limit_dialect
487
488 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
489 default L</sql_limit_dialect> setting of the storage (if any). For a list
490 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
491
492 =item quote_names
493
494 When true automatically sets L</quote_char> and L</name_sep> to the characters
495 appropriate for your particular RDBMS. This option is preferred over specifying
496 L</quote_char> directly.
497
498 =item quote_char
499
500 Specifies what characters to use to quote table and column names.
501
502 C<quote_char> expects either a single character, in which case is it
503 is placed on either side of the table/column name, or an arrayref of length
504 2 in which case the table/column name is placed between the elements.
505
506 For example under MySQL you should use C<< quote_char => '`' >>, and for
507 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
508
509 =item name_sep
510
511 This parameter is only useful in conjunction with C<quote_char>, and is used to
512 specify the character that separates elements (schemas, tables, columns) from
513 each other. If unspecified it defaults to the most commonly used C<.>.
514
515 =item unsafe
516
517 This Storage driver normally installs its own C<HandleError>, sets
518 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
519 all database handles, including those supplied by a coderef.  It does this
520 so that it can have consistent and useful error behavior.
521
522 If you set this option to a true value, Storage will not do its usual
523 modifications to the database handle's attributes, and instead relies on
524 the settings in your connect_info DBI options (or the values you set in
525 your connection coderef, in the case that you are connecting via coderef).
526
527 Note that your custom settings can cause Storage to malfunction,
528 especially if you set a C<HandleError> handler that suppresses exceptions
529 and/or disable C<RaiseError>.
530
531 =item auto_savepoint
532
533 If this option is true, L<DBIx::Class> will use savepoints when nesting
534 transactions, making it possible to recover from failure in the inner
535 transaction without having to abort all outer transactions.
536
537 =item cursor_class
538
539 Use this argument to supply a cursor class other than the default
540 L<DBIx::Class::Storage::DBI::Cursor>.
541
542 =back
543
544 Some real-life examples of arguments to L</connect_info> and
545 L<DBIx::Class::Schema/connect>
546
547   # Simple SQLite connection
548   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
549
550   # Connect via subref
551   ->connect_info([ sub { DBI->connect(...) } ]);
552
553   # Connect via subref in hashref
554   ->connect_info([{
555     dbh_maker => sub { DBI->connect(...) },
556     on_connect_do => 'alter session ...',
557   }]);
558
559   # A bit more complicated
560   ->connect_info(
561     [
562       'dbi:Pg:dbname=foo',
563       'postgres',
564       'my_pg_password',
565       { AutoCommit => 1 },
566       { quote_char => q{"} },
567     ]
568   );
569
570   # Equivalent to the previous example
571   ->connect_info(
572     [
573       'dbi:Pg:dbname=foo',
574       'postgres',
575       'my_pg_password',
576       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
577     ]
578   );
579
580   # Same, but with hashref as argument
581   # See parse_connect_info for explanation
582   ->connect_info(
583     [{
584       dsn         => 'dbi:Pg:dbname=foo',
585       user        => 'postgres',
586       password    => 'my_pg_password',
587       AutoCommit  => 1,
588       quote_char  => q{"},
589       name_sep    => q{.},
590     }]
591   );
592
593   # Subref + DBIx::Class-specific connection options
594   ->connect_info(
595     [
596       sub { DBI->connect(...) },
597       {
598           quote_char => q{`},
599           name_sep => q{@},
600           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
601           disable_sth_caching => 1,
602       },
603     ]
604   );
605
606
607
608 =cut
609
610 sub connect_info {
611   my ($self, $info) = @_;
612
613   return $self->_connect_info if !$info;
614
615   $self->_connect_info($info); # copy for _connect_info
616
617   $info = $self->_normalize_connect_info($info)
618     if ref $info eq 'ARRAY';
619
620   my %attrs = (
621     %{ $self->_default_dbi_connect_attributes || {} },
622     %{ $info->{attributes} || {} },
623   );
624
625   my @args = @{ $info->{arguments} };
626
627   if (keys %attrs and ref $args[0] ne 'CODE') {
628     carp_unique (
629         'You provided explicit AutoCommit => 0 in your connection_info. '
630       . 'This is almost universally a bad idea (see the footnotes of '
631       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
632       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
633       . 'this warning.'
634     ) if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
635
636     push @args, \%attrs if keys %attrs;
637   }
638
639   # this is the authoritative "always an arrayref" thing fed to DBI->connect
640   # OR a single-element coderef-based $dbh factory
641   $self->_dbi_connect_info(\@args);
642
643   # extract the individual storage options
644   for my $storage_opt (keys %{ $info->{storage_options} }) {
645     my $value = $info->{storage_options}{$storage_opt};
646
647     $self->$storage_opt($value);
648   }
649
650   # Extract the individual sqlmaker options
651   #
652   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
653   #  the new set of options
654   $self->_sql_maker(undef);
655   $self->_sql_maker_opts({});
656
657   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
658     my $value = $info->{sql_maker_options}{$sql_maker_opt};
659
660     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
661   }
662
663   # FIXME - dirty:
664   # save attributes in a separate accessor so they are always
665   # introspectable, even in case of a CODE $dbhmaker
666   $self->_dbic_connect_attributes (\%attrs);
667
668   return $self->_connect_info;
669 }
670
671 sub _dbi_connect_info {
672   my $self = shift;
673
674   return $self->{_dbi_connect_info} = $_[0]
675     if @_;
676
677   my $conninfo = $self->{_dbi_connect_info} || [];
678
679   # last ditch effort to grab a DSN
680   if ( ! defined $conninfo->[0] and $ENV{DBI_DSN} ) {
681     my @new_conninfo = @$conninfo;
682     $new_conninfo[0] = $ENV{DBI_DSN};
683     $conninfo = \@new_conninfo;
684   }
685
686   return $conninfo;
687 }
688
689
690 sub _normalize_connect_info {
691   my ($self, $info_arg) = @_;
692   my %info;
693
694   my @args = @$info_arg;  # take a shallow copy for further mutilation
695
696   # combine/pre-parse arguments depending on invocation style
697
698   my %attrs;
699   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
700     %attrs = %{ $args[1] || {} };
701     @args = $args[0];
702   }
703   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
704     %attrs = %{$args[0]};
705     @args = ();
706     if (my $code = delete $attrs{dbh_maker}) {
707       @args = $code;
708
709       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
710       if (@ignored) {
711         carp sprintf (
712             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
713           . "to the result of 'dbh_maker'",
714
715           join (', ', map { "'$_'" } (@ignored) ),
716         );
717       }
718     }
719     else {
720       @args = delete @attrs{qw/dsn user password/};
721     }
722   }
723   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
724     %attrs = (
725       % { $args[3] || {} },
726       % { $args[4] || {} },
727     );
728     @args = @args[0,1,2];
729   }
730
731   $info{arguments} = \@args;
732
733   my @storage_opts = grep exists $attrs{$_},
734     @storage_options, 'cursor_class';
735
736   @{ $info{storage_options} }{@storage_opts} =
737     delete @attrs{@storage_opts} if @storage_opts;
738
739   my @sql_maker_opts = grep exists $attrs{$_},
740     qw/limit_dialect quote_char name_sep quote_names/;
741
742   @{ $info{sql_maker_options} }{@sql_maker_opts} =
743     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
744
745   $info{attributes} = \%attrs if %attrs;
746
747   return \%info;
748 }
749
750 sub _default_dbi_connect_attributes () {
751   +{
752     AutoCommit => 1,
753     PrintError => 0,
754     RaiseError => 1,
755     ShowErrorStatement => 1,
756   };
757 }
758
759 =head2 on_connect_do
760
761 This method is deprecated in favour of setting via L</connect_info>.
762
763 =cut
764
765 =head2 on_disconnect_do
766
767 This method is deprecated in favour of setting via L</connect_info>.
768
769 =cut
770
771 sub _parse_connect_do {
772   my ($self, $type) = @_;
773
774   my $val = $self->$type;
775   return () if not defined $val;
776
777   my @res;
778
779   if (not ref($val)) {
780     push @res, [ 'do_sql', $val ];
781   } elsif (ref($val) eq 'CODE') {
782     push @res, $val;
783   } elsif (ref($val) eq 'ARRAY') {
784     push @res, map { [ 'do_sql', $_ ] } @$val;
785   } else {
786     $self->throw_exception("Invalid type for $type: ".ref($val));
787   }
788
789   return \@res;
790 }
791
792 =head2 dbh_do
793
794 Arguments: ($subref | $method_name), @extra_coderef_args?
795
796 Execute the given $subref or $method_name using the new exception-based
797 connection management.
798
799 The first two arguments will be the storage object that C<dbh_do> was called
800 on and a database handle to use.  Any additional arguments will be passed
801 verbatim to the called subref as arguments 2 and onwards.
802
803 Using this (instead of $self->_dbh or $self->dbh) ensures correct
804 exception handling and reconnection (or failover in future subclasses).
805
806 Your subref should have no side-effects outside of the database, as
807 there is the potential for your subref to be partially double-executed
808 if the database connection was stale/dysfunctional.
809
810 Example:
811
812   my @stuff = $schema->storage->dbh_do(
813     sub {
814       my ($storage, $dbh, @cols) = @_;
815       my $cols = join(q{, }, @cols);
816       $dbh->selectrow_array("SELECT $cols FROM foo");
817     },
818     @column_list
819   );
820
821 =cut
822
823 sub dbh_do {
824   my $self = shift;
825   my $run_target = shift;
826
827   # short circuit when we know there is no need for a runner
828   #
829   # FIXME - assumption may be wrong
830   # the rationale for the txn_depth check is that if this block is a part
831   # of a larger transaction, everything up to that point is screwed anyway
832   return $self->$run_target($self->_get_dbh, @_)
833     if $self->{_in_do_block} or $self->transaction_depth;
834
835   # take a ref instead of a copy, to preserve @_ aliasing
836   # semantics within the coderef, but only if needed
837   # (pseudoforking doesn't like this trick much)
838   my $args = @_ ? \@_ : [];
839
840   DBIx::Class::Storage::BlockRunner->new(
841     storage => $self,
842     run_code => sub { $self->$run_target ($self->_get_dbh, @$args ) },
843     wrap_txn => 0,
844     retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) },
845   )->run;
846 }
847
848 sub txn_do {
849   $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
850   shift->next::method(@_);
851 }
852
853 =head2 disconnect
854
855 Our C<disconnect> method also performs a rollback first if the
856 database is not in C<AutoCommit> mode.
857
858 =cut
859
860 sub disconnect {
861   my ($self) = @_;
862
863   if( $self->_dbh ) {
864     my @actions;
865
866     push @actions, ( $self->on_disconnect_call || () );
867     push @actions, $self->_parse_connect_do ('on_disconnect_do');
868
869     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
870
871     # stops the "implicit rollback on disconnect" warning
872     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
873
874     %{ $self->_dbh->{CachedKids} } = ();
875     $self->_dbh->disconnect;
876     $self->_dbh(undef);
877   }
878 }
879
880 =head2 with_deferred_fk_checks
881
882 =over 4
883
884 =item Arguments: C<$coderef>
885
886 =item Return Value: The return value of $coderef
887
888 =back
889
890 Storage specific method to run the code ref with FK checks deferred or
891 in MySQL's case disabled entirely.
892
893 =cut
894
895 # Storage subclasses should override this
896 sub with_deferred_fk_checks {
897   my ($self, $sub) = @_;
898   $sub->();
899 }
900
901 =head2 connected
902
903 =over
904
905 =item Arguments: none
906
907 =item Return Value: 1|0
908
909 =back
910
911 Verifies that the current database handle is active and ready to execute
912 an SQL statement (e.g. the connection did not get stale, server is still
913 answering, etc.) This method is used internally by L</dbh>.
914
915 =cut
916
917 sub connected {
918   my $self = shift;
919   return 0 unless $self->_seems_connected;
920
921   #be on the safe side
922   local $self->_dbh->{RaiseError} = 1;
923
924   return $self->_ping;
925 }
926
927 sub _seems_connected {
928   my $self = shift;
929
930   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
931
932   my $dbh = $self->_dbh
933     or return 0;
934
935   return $dbh->FETCH('Active');
936 }
937
938 sub _ping {
939   my $self = shift;
940
941   my $dbh = $self->_dbh or return 0;
942
943   return $dbh->ping;
944 }
945
946 sub ensure_connected {
947   my ($self) = @_;
948
949   unless ($self->connected) {
950     $self->_populate_dbh;
951   }
952 }
953
954 =head2 dbh
955
956 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
957 is guaranteed to be healthy by implicitly calling L</connected>, and if
958 necessary performing a reconnection before returning. Keep in mind that this
959 is very B<expensive> on some database engines. Consider using L</dbh_do>
960 instead.
961
962 =cut
963
964 sub dbh {
965   my ($self) = @_;
966
967   if (not $self->_dbh) {
968     $self->_populate_dbh;
969   } else {
970     $self->ensure_connected;
971   }
972   return $self->_dbh;
973 }
974
975 # this is the internal "get dbh or connect (don't check)" method
976 sub _get_dbh {
977   my $self = shift;
978   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
979   $self->_populate_dbh unless $self->_dbh;
980   return $self->_dbh;
981 }
982
983 sub sql_maker {
984   my ($self) = @_;
985   unless ($self->_sql_maker) {
986     my $sql_maker_class = $self->sql_maker_class;
987
988     my %opts = %{$self->_sql_maker_opts||{}};
989     my $dialect =
990       $opts{limit_dialect}
991         ||
992       $self->sql_limit_dialect
993         ||
994       do {
995         my $s_class = (ref $self) || $self;
996         carp_unique (
997           "Your storage class ($s_class) does not set sql_limit_dialect and you "
998         . 'have not supplied an explicit limit_dialect in your connection_info. '
999         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1000         . 'databases but can be (and often is) painfully slow. '
1001         . "Please file an RT ticket against '$s_class'"
1002         ) if $self->_dbi_connect_info->[0];
1003
1004         'GenericSubQ';
1005       }
1006     ;
1007
1008     my ($quote_char, $name_sep);
1009
1010     if ($opts{quote_names}) {
1011       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1012         my $s_class = (ref $self) || $self;
1013         carp_unique (
1014           "You requested 'quote_names' but your storage class ($s_class) does "
1015         . 'not explicitly define a default sql_quote_char and you have not '
1016         . 'supplied a quote_char as part of your connection_info. DBIC will '
1017         .q{default to the ANSI SQL standard quote '"', which works most of }
1018         . "the time. Please file an RT ticket against '$s_class'."
1019         );
1020
1021         '"'; # RV
1022       };
1023
1024       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1025     }
1026
1027     $self->_sql_maker($sql_maker_class->new(
1028       bindtype=>'columns',
1029       array_datatypes => 1,
1030       limit_dialect => $dialect,
1031       ($quote_char ? (quote_char => $quote_char) : ()),
1032       name_sep => ($name_sep || '.'),
1033       %opts,
1034     ));
1035   }
1036   return $self->_sql_maker;
1037 }
1038
1039 # nothing to do by default
1040 sub _rebless {}
1041 sub _init {}
1042
1043 sub _populate_dbh {
1044   my ($self) = @_;
1045
1046   $self->_dbh(undef); # in case ->connected failed we might get sent here
1047   $self->_dbh_details({}); # reset everything we know
1048
1049   $self->_dbh($self->_connect);
1050
1051   $self->_conn_pid($$) unless DBIx::Class::_ENV_::BROKEN_FORK; # on win32 these are in fact threads
1052
1053   $self->_determine_driver;
1054
1055   # Always set the transaction depth on connect, since
1056   #  there is no transaction in progress by definition
1057   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1058
1059   $self->_run_connection_actions unless $self->{_in_determine_driver};
1060 }
1061
1062 sub _run_connection_actions {
1063   my $self = shift;
1064   my @actions;
1065
1066   push @actions, ( $self->on_connect_call || () );
1067   push @actions, $self->_parse_connect_do ('on_connect_do');
1068
1069   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1070 }
1071
1072
1073
1074 sub set_use_dbms_capability {
1075   $_[0]->set_inherited ($_[1], $_[2]);
1076 }
1077
1078 sub get_use_dbms_capability {
1079   my ($self, $capname) = @_;
1080
1081   my $use = $self->get_inherited ($capname);
1082   return defined $use
1083     ? $use
1084     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1085   ;
1086 }
1087
1088 sub set_dbms_capability {
1089   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1090 }
1091
1092 sub get_dbms_capability {
1093   my ($self, $capname) = @_;
1094
1095   my $cap = $self->_dbh_details->{capability}{$capname};
1096
1097   unless (defined $cap) {
1098     if (my $meth = $self->can ("_determine$capname")) {
1099       $cap = $self->$meth ? 1 : 0;
1100     }
1101     else {
1102       $cap = 0;
1103     }
1104
1105     $self->set_dbms_capability ($capname, $cap);
1106   }
1107
1108   return $cap;
1109 }
1110
1111 sub _server_info {
1112   my $self = shift;
1113
1114   my $info;
1115   unless ($info = $self->_dbh_details->{info}) {
1116
1117     $info = {};
1118
1119     my $server_version = try {
1120       $self->_get_server_version
1121     } catch {
1122       # driver determination *may* use this codepath
1123       # in which case we must rethrow
1124       $self->throw_exception($_) if $self->{_in_determine_driver};
1125
1126       # $server_version on failure
1127       undef;
1128     };
1129
1130     if (defined $server_version) {
1131       $info->{dbms_version} = $server_version;
1132
1133       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1134       my @verparts = split (/\./, $numeric_version);
1135       if (
1136         @verparts
1137           &&
1138         $verparts[0] <= 999
1139       ) {
1140         # consider only up to 3 version parts, iff not more than 3 digits
1141         my @use_parts;
1142         while (@verparts && @use_parts < 3) {
1143           my $p = shift @verparts;
1144           last if $p > 999;
1145           push @use_parts, $p;
1146         }
1147         push @use_parts, 0 while @use_parts < 3;
1148
1149         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1150       }
1151     }
1152
1153     $self->_dbh_details->{info} = $info;
1154   }
1155
1156   return $info;
1157 }
1158
1159 sub _get_server_version {
1160   shift->_dbh_get_info('SQL_DBMS_VER');
1161 }
1162
1163 sub _dbh_get_info {
1164   my ($self, $info) = @_;
1165
1166   if ($info =~ /[^0-9]/) {
1167     require DBI::Const::GetInfoType;
1168     $info = $DBI::Const::GetInfoType::GetInfoType{$info};
1169     $self->throw_exception("Info type '$_[1]' not provided by DBI::Const::GetInfoType")
1170       unless defined $info;
1171   }
1172
1173   $self->_get_dbh->get_info($info);
1174 }
1175
1176 sub _describe_connection {
1177   require DBI::Const::GetInfoReturn;
1178
1179   my $self = shift;
1180
1181   my $drv;
1182   try {
1183     $drv = $self->_extract_driver_from_connect_info;
1184     $self->ensure_connected;
1185   };
1186
1187   $drv = "DBD::$drv" if $drv;
1188
1189   my $res = {
1190     DBIC_DSN => $self->_dbi_connect_info->[0],
1191     DBI_VER => DBI->VERSION,
1192     DBIC_VER => DBIx::Class->VERSION,
1193     DBIC_DRIVER => ref $self,
1194     $drv ? (
1195       DBD => $drv,
1196       DBD_VER => try { $drv->VERSION },
1197     ) : (),
1198   };
1199
1200   # try to grab data even if we never managed to connect
1201   # will cover us in cases of an oddly broken half-connect
1202   for my $inf (
1203     #keys %DBI::Const::GetInfoType::GetInfoType,
1204     qw/
1205       SQL_CURSOR_COMMIT_BEHAVIOR
1206       SQL_CURSOR_ROLLBACK_BEHAVIOR
1207       SQL_CURSOR_SENSITIVITY
1208       SQL_DATA_SOURCE_NAME
1209       SQL_DBMS_NAME
1210       SQL_DBMS_VER
1211       SQL_DEFAULT_TXN_ISOLATION
1212       SQL_DM_VER
1213       SQL_DRIVER_NAME
1214       SQL_DRIVER_ODBC_VER
1215       SQL_DRIVER_VER
1216       SQL_EXPRESSIONS_IN_ORDERBY
1217       SQL_GROUP_BY
1218       SQL_IDENTIFIER_CASE
1219       SQL_IDENTIFIER_QUOTE_CHAR
1220       SQL_MAX_CATALOG_NAME_LEN
1221       SQL_MAX_COLUMN_NAME_LEN
1222       SQL_MAX_IDENTIFIER_LEN
1223       SQL_MAX_TABLE_NAME_LEN
1224       SQL_MULTIPLE_ACTIVE_TXN
1225       SQL_MULT_RESULT_SETS
1226       SQL_NEED_LONG_DATA_LEN
1227       SQL_NON_NULLABLE_COLUMNS
1228       SQL_ODBC_VER
1229       SQL_QUALIFIER_NAME_SEPARATOR
1230       SQL_QUOTED_IDENTIFIER_CASE
1231       SQL_TXN_CAPABLE
1232       SQL_TXN_ISOLATION_OPTION
1233     /
1234   ) {
1235     # some drivers barf on things they do not know about instead
1236     # of returning undef
1237     my $v = try { $self->_dbh_get_info($inf) };
1238     next unless defined $v;
1239
1240     #my $key = sprintf( '%s(%s)', $inf, $DBI::Const::GetInfoType::GetInfoType{$inf} );
1241     my $expl = DBI::Const::GetInfoReturn::Explain($inf, $v);
1242     $res->{$inf} = DBI::Const::GetInfoReturn::Format($inf, $v) . ( $expl ? " ($expl)" : '' );
1243   }
1244
1245   $res;
1246 }
1247
1248 sub _determine_driver {
1249   my ($self) = @_;
1250
1251   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1252     my $started_connected = 0;
1253     local $self->{_in_determine_driver} = 1;
1254
1255     if (ref($self) eq __PACKAGE__) {
1256       my $driver;
1257       if ($self->_dbh) { # we are connected
1258         $driver = $self->_dbh->{Driver}{Name};
1259         $started_connected = 1;
1260       }
1261       else {
1262         $driver = $self->_extract_driver_from_connect_info;
1263       }
1264
1265       if ($driver) {
1266         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1267         if ($self->load_optional_class($storage_class)) {
1268           mro::set_mro($storage_class, 'c3');
1269           bless $self, $storage_class;
1270           $self->_rebless();
1271         }
1272         else {
1273           $self->_warn_undetermined_driver(
1274             'This version of DBIC does not yet seem to supply a driver for '
1275           . "your particular RDBMS and/or connection method ('$driver')."
1276           );
1277         }
1278       }
1279       else {
1280         $self->_warn_undetermined_driver(
1281           'Unable to extract a driver name from connect info - this '
1282         . 'should not have happened.'
1283         );
1284       }
1285     }
1286
1287     $self->_driver_determined(1);
1288
1289     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1290
1291     if ($self->can('source_bind_attributes')) {
1292       $self->throw_exception(
1293         "Your storage subclass @{[ ref $self ]} provides (or inherits) the method "
1294       . 'source_bind_attributes() for which support has been removed as of Jan 2013. '
1295       . 'If you are not sure how to proceed please contact the development team via '
1296       . 'http://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT'
1297       );
1298     }
1299
1300     $self->_init; # run driver-specific initializations
1301
1302     $self->_run_connection_actions
1303         if !$started_connected && defined $self->_dbh;
1304   }
1305 }
1306
1307 sub _extract_driver_from_connect_info {
1308   my $self = shift;
1309
1310   my $drv;
1311
1312   # if connect_info is a CODEREF, we have no choice but to connect
1313   if (
1314     ref $self->_dbi_connect_info->[0]
1315       and
1316     reftype $self->_dbi_connect_info->[0] eq 'CODE'
1317   ) {
1318     $self->_populate_dbh;
1319     $drv = $self->_dbh->{Driver}{Name};
1320   }
1321   else {
1322     # try to use dsn to not require being connected, the driver may still
1323     # force a connection later in _rebless to determine version
1324     # (dsn may not be supplied at all if all we do is make a mock-schema)
1325     ($drv) = ($self->_dbi_connect_info->[0] || '') =~ /^dbi:([^:]+):/i;
1326     $drv ||= $ENV{DBI_DRIVER};
1327   }
1328
1329   return $drv;
1330 }
1331
1332 sub _determine_connector_driver {
1333   my ($self, $conn) = @_;
1334
1335   my $dbtype = $self->_dbh_get_info('SQL_DBMS_NAME');
1336
1337   if (not $dbtype) {
1338     $self->_warn_undetermined_driver(
1339       'Unable to retrieve RDBMS type (SQL_DBMS_NAME) of the engine behind your '
1340     . "$conn connector - this should not have happened."
1341     );
1342     return;
1343   }
1344
1345   $dbtype =~ s/\W/_/gi;
1346
1347   my $subclass = "DBIx::Class::Storage::DBI::${conn}::${dbtype}";
1348   return if $self->isa($subclass);
1349
1350   if ($self->load_optional_class($subclass)) {
1351     bless $self, $subclass;
1352     $self->_rebless;
1353   }
1354   else {
1355     $self->_warn_undetermined_driver(
1356       'This version of DBIC does not yet seem to supply a driver for '
1357     . "your particular RDBMS and/or connection method ('$conn/$dbtype')."
1358     );
1359   }
1360 }
1361
1362 sub _warn_undetermined_driver {
1363   my ($self, $msg) = @_;
1364
1365   require Data::Dumper::Concise;
1366
1367   carp_once ($msg . ' While we will attempt to continue anyway, the results '
1368   . 'are likely to be underwhelming. Please upgrade DBIC, and if this message '
1369   . "does not go away, file a bugreport including the following info:\n"
1370   . Data::Dumper::Concise::Dumper($self->_describe_connection)
1371   );
1372 }
1373
1374 sub _do_connection_actions {
1375   my $self          = shift;
1376   my $method_prefix = shift;
1377   my $call          = shift;
1378
1379   if (not ref($call)) {
1380     my $method = $method_prefix . $call;
1381     $self->$method(@_);
1382   } elsif (ref($call) eq 'CODE') {
1383     $self->$call(@_);
1384   } elsif (ref($call) eq 'ARRAY') {
1385     if (ref($call->[0]) ne 'ARRAY') {
1386       $self->_do_connection_actions($method_prefix, $_) for @$call;
1387     } else {
1388       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1389     }
1390   } else {
1391     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1392   }
1393
1394   return $self;
1395 }
1396
1397 sub connect_call_do_sql {
1398   my $self = shift;
1399   $self->_do_query(@_);
1400 }
1401
1402 sub disconnect_call_do_sql {
1403   my $self = shift;
1404   $self->_do_query(@_);
1405 }
1406
1407 # override in db-specific backend when necessary
1408 sub connect_call_datetime_setup { 1 }
1409
1410 sub _do_query {
1411   my ($self, $action) = @_;
1412
1413   if (ref $action eq 'CODE') {
1414     $action = $action->($self);
1415     $self->_do_query($_) foreach @$action;
1416   }
1417   else {
1418     # Most debuggers expect ($sql, @bind), so we need to exclude
1419     # the attribute hash which is the second argument to $dbh->do
1420     # furthermore the bind values are usually to be presented
1421     # as named arrayref pairs, so wrap those here too
1422     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1423     my $sql = shift @do_args;
1424     my $attrs = shift @do_args;
1425     my @bind = map { [ undef, $_ ] } @do_args;
1426
1427     $self->dbh_do(sub {
1428       $_[0]->_query_start($sql, \@bind);
1429       $_[1]->do($sql, $attrs, @do_args);
1430       $_[0]->_query_end($sql, \@bind);
1431     });
1432   }
1433
1434   return $self;
1435 }
1436
1437 sub _connect {
1438   my $self = shift;
1439
1440   my $info = $self->_dbi_connect_info;
1441
1442   $self->throw_exception("You did not provide any connection_info")
1443     unless defined $info->[0];
1444
1445   my ($old_connect_via, $dbh);
1446
1447   local $DBI::connect_via = 'connect' if $INC{'Apache/DBI.pm'} && $ENV{MOD_PERL};
1448
1449   # this odd anonymous coderef dereference is in fact really
1450   # necessary to avoid the unwanted effect described in perl5
1451   # RT#75792
1452   #
1453   # in addition the coderef itself can't reside inside the try{} block below
1454   # as it somehow triggers a leak under perl -d
1455   my $dbh_error_handler_installer = sub {
1456     weaken (my $weak_self = $_[0]);
1457
1458     # the coderef is blessed so we can distinguish it from externally
1459     # supplied handles (which must be preserved)
1460     $_[1]->{HandleError} = bless sub {
1461       if ($weak_self) {
1462         $weak_self->throw_exception("DBI Exception: $_[0]");
1463       }
1464       else {
1465         # the handler may be invoked by something totally out of
1466         # the scope of DBIC
1467         DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1468       }
1469     }, '__DBIC__DBH__ERROR__HANDLER__';
1470   };
1471
1472   try {
1473     if(ref $info->[0] eq 'CODE') {
1474       $dbh = $info->[0]->();
1475     }
1476     else {
1477       require DBI;
1478       $dbh = DBI->connect(@$info);
1479     }
1480
1481     die $DBI::errstr unless $dbh;
1482
1483     die sprintf ("%s fresh DBI handle with a *false* 'Active' attribute. "
1484       . 'This handle is disconnected as far as DBIC is concerned, and we can '
1485       . 'not continue',
1486       ref $info->[0] eq 'CODE'
1487         ? "Connection coderef $info->[0] returned a"
1488         : 'DBI->connect($schema->storage->connect_info) resulted in a'
1489     ) unless $dbh->FETCH('Active');
1490
1491     # sanity checks unless asked otherwise
1492     unless ($self->unsafe) {
1493
1494       $self->throw_exception(
1495         'Refusing clobbering of {HandleError} installed on externally supplied '
1496        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1497       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1498
1499       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1500       # request, or an external handle. Complain and set anyway
1501       unless ($dbh->{RaiseError}) {
1502         carp( ref $info->[0] eq 'CODE'
1503
1504           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1505            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1506            .'attribute has been supplied'
1507
1508           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1509            .'unsafe => 1. Toggling RaiseError back to true'
1510         );
1511
1512         $dbh->{RaiseError} = 1;
1513       }
1514
1515       $dbh_error_handler_installer->($self, $dbh);
1516     }
1517   }
1518   catch {
1519     $self->throw_exception("DBI Connection failed: $_")
1520   };
1521
1522   $self->_dbh_autocommit($dbh->{AutoCommit});
1523   return $dbh;
1524 }
1525
1526 sub txn_begin {
1527   my $self = shift;
1528
1529   # this means we have not yet connected and do not know the AC status
1530   # (e.g. coderef $dbh), need a full-fledged connection check
1531   if (! defined $self->_dbh_autocommit) {
1532     $self->ensure_connected;
1533   }
1534   # Otherwise simply connect or re-connect on pid changes
1535   else {
1536     $self->_get_dbh;
1537   }
1538
1539   $self->next::method(@_);
1540 }
1541
1542 sub _exec_txn_begin {
1543   my $self = shift;
1544
1545   # if the user is utilizing txn_do - good for him, otherwise we need to
1546   # ensure that the $dbh is healthy on BEGIN.
1547   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1548   # will be replaced by a failure of begin_work itself (which will be
1549   # then retried on reconnect)
1550   if ($self->{_in_do_block}) {
1551     $self->_dbh->begin_work;
1552   } else {
1553     $self->dbh_do(sub { $_[1]->begin_work });
1554   }
1555 }
1556
1557 sub txn_commit {
1558   my $self = shift;
1559
1560   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1561   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1562     unless $self->_dbh;
1563
1564   # esoteric case for folks using external $dbh handles
1565   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1566     carp "Storage transaction_depth 0 does not match "
1567         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1568     $self->transaction_depth(1);
1569   }
1570
1571   $self->next::method(@_);
1572
1573   # if AutoCommit is disabled txn_depth never goes to 0
1574   # as a new txn is started immediately on commit
1575   $self->transaction_depth(1) if (
1576     !$self->transaction_depth
1577       and
1578     defined $self->_dbh_autocommit
1579       and
1580     ! $self->_dbh_autocommit
1581   );
1582 }
1583
1584 sub _exec_txn_commit {
1585   shift->_dbh->commit;
1586 }
1587
1588 sub txn_rollback {
1589   my $self = shift;
1590
1591   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1592   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1593     unless $self->_dbh;
1594
1595   # esoteric case for folks using external $dbh handles
1596   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1597     carp "Storage transaction_depth 0 does not match "
1598         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1599     $self->transaction_depth(1);
1600   }
1601
1602   $self->next::method(@_);
1603
1604   # if AutoCommit is disabled txn_depth never goes to 0
1605   # as a new txn is started immediately on commit
1606   $self->transaction_depth(1) if (
1607     !$self->transaction_depth
1608       and
1609     defined $self->_dbh_autocommit
1610       and
1611     ! $self->_dbh_autocommit
1612   );
1613 }
1614
1615 sub _exec_txn_rollback {
1616   shift->_dbh->rollback;
1617 }
1618
1619 # generate some identical methods
1620 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1621   no strict qw/refs/;
1622   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1623     my $self = shift;
1624     $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1625     $self->throw_exception("Unable to $meth() on a disconnected storage")
1626       unless $self->_dbh;
1627     $self->next::method(@_);
1628   };
1629 }
1630
1631 # This used to be the top-half of _execute.  It was split out to make it
1632 #  easier to override in NoBindVars without duping the rest.  It takes up
1633 #  all of _execute's args, and emits $sql, @bind.
1634 sub _prep_for_execute {
1635   #my ($self, $op, $ident, $args) = @_;
1636   return shift->_gen_sql_bind(@_)
1637 }
1638
1639 sub _gen_sql_bind {
1640   my ($self, $op, $ident, $args) = @_;
1641
1642   my ($colinfos, $from);
1643   if ( blessed($ident) ) {
1644     $from = $ident->from;
1645     $colinfos = $ident->columns_info;
1646   }
1647
1648   my ($sql, $bind);
1649   ($sql, @$bind) = $self->sql_maker->$op( ($from || $ident), @$args );
1650
1651   $bind = $self->_resolve_bindattrs(
1652     $ident, [ @{$args->[2]{bind}||[]}, @$bind ], $colinfos
1653   );
1654
1655   if (
1656     ! $ENV{DBIC_DT_SEARCH_OK}
1657       and
1658     $op eq 'select'
1659       and
1660     first {
1661       length ref $_->[1]
1662         and
1663       blessed($_->[1])
1664         and
1665       $_->[1]->isa('DateTime')
1666     } @$bind
1667   ) {
1668     carp_unique 'DateTime objects passed to search() are not supported '
1669       . 'properly (InflateColumn::DateTime formats and settings are not '
1670       . 'respected.) See "Formatting DateTime objects in queries" in '
1671       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1672       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1673   }
1674
1675   return( $sql, $bind );
1676 }
1677
1678 sub _resolve_bindattrs {
1679   my ($self, $ident, $bind, $colinfos) = @_;
1680
1681   $colinfos ||= {};
1682
1683   my $resolve_bindinfo = sub {
1684     #my $infohash = shift;
1685
1686     %$colinfos = %{ $self->_resolve_column_info($ident) }
1687       unless keys %$colinfos;
1688
1689     my $ret;
1690     if (my $col = $_[0]->{dbic_colname}) {
1691       $ret = { %{$_[0]} };
1692
1693       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1694         if $colinfos->{$col}{data_type};
1695
1696       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1697         if $colinfos->{$col}{size};
1698     }
1699
1700     $ret || $_[0];
1701   };
1702
1703   return [ map {
1704     my $resolved =
1705       ( ref $_ ne 'ARRAY' or @$_ != 2 ) ? [ {}, $_ ]
1706     : ( ! defined $_->[0] )             ? [ {}, $_->[1] ]
1707     : (ref $_->[0] eq 'HASH')           ? [ (exists $_->[0]{dbd_attrs} or $_->[0]{sqlt_datatype})
1708                                               ? $_->[0]
1709                                               : $resolve_bindinfo->($_->[0])
1710                                             , $_->[1] ]
1711     : (ref $_->[0] eq 'SCALAR')         ? [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1712     :                                     [ $resolve_bindinfo->(
1713                                               { dbic_colname => $_->[0] }
1714                                             ), $_->[1] ]
1715     ;
1716
1717     if (
1718       ! exists $resolved->[0]{dbd_attrs}
1719         and
1720       ! $resolved->[0]{sqlt_datatype}
1721         and
1722       length ref $resolved->[1]
1723         and
1724       ! overload::Method($resolved->[1], '""')
1725     ) {
1726       require Data::Dumper;
1727       local $Data::Dumper::Maxdepth = 1;
1728       local $Data::Dumper::Terse = 1;
1729       local $Data::Dumper::Useqq = 1;
1730       local $Data::Dumper::Indent = 0;
1731       local $Data::Dumper::Pad = ' ';
1732       $self->throw_exception(
1733         'You must supply a datatype/bindtype (see DBIx::Class::ResultSet/DBIC BIND VALUES) '
1734       . 'for non-scalar value '. Data::Dumper::Dumper ($resolved->[1])
1735       );
1736     }
1737
1738     $resolved;
1739
1740   } @$bind ];
1741 }
1742
1743 sub _format_for_trace {
1744   #my ($self, $bind) = @_;
1745
1746   ### Turn @bind from something like this:
1747   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1748   ### to this:
1749   ###   ( "'1'", "'3'" )
1750
1751   map {
1752     defined( $_ && $_->[1] )
1753       ? qq{'$_->[1]'}
1754       : q{NULL}
1755   } @{$_[1] || []};
1756 }
1757
1758 sub _query_start {
1759   my ( $self, $sql, $bind ) = @_;
1760
1761   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1762     if $self->debug;
1763 }
1764
1765 sub _query_end {
1766   my ( $self, $sql, $bind ) = @_;
1767
1768   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1769     if $self->debug;
1770 }
1771
1772 sub _dbi_attrs_for_bind {
1773   my ($self, $ident, $bind) = @_;
1774
1775   my @attrs;
1776
1777   for (map { $_->[0] } @$bind) {
1778     push @attrs, do {
1779       if (exists $_->{dbd_attrs}) {
1780         $_->{dbd_attrs}
1781       }
1782       elsif($_->{sqlt_datatype}) {
1783         # cache the result in the dbh_details hash, as it can not change unless
1784         # we connect to something else
1785         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1786         if (not exists $cache->{$_->{sqlt_datatype}}) {
1787           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1788         }
1789         $cache->{$_->{sqlt_datatype}};
1790       }
1791       else {
1792         undef;  # always push something at this position
1793       }
1794     }
1795   }
1796
1797   return \@attrs;
1798 }
1799
1800 sub _execute {
1801   my ($self, $op, $ident, @args) = @_;
1802
1803   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1804
1805   # not even a PID check - we do not care about the state of the _dbh.
1806   # All we need is to get the appropriate drivers loaded if they aren't
1807   # already so that the assumption in ad7c50fc26e holds
1808   $self->_populate_dbh unless $self->_dbh;
1809
1810   $self->dbh_do( _dbh_execute =>     # retry over disconnects
1811     $sql,
1812     $bind,
1813     $self->_dbi_attrs_for_bind($ident, $bind),
1814   );
1815 }
1816
1817 sub _dbh_execute {
1818   my ($self, $dbh, $sql, $bind, $bind_attrs) = @_;
1819
1820   $self->_query_start( $sql, $bind );
1821
1822   my $sth = $self->_bind_sth_params(
1823     $self->_prepare_sth($dbh, $sql),
1824     $bind,
1825     $bind_attrs,
1826   );
1827
1828   # Can this fail without throwing an exception anyways???
1829   my $rv = $sth->execute();
1830   $self->throw_exception(
1831     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1832   ) if !$rv;
1833
1834   $self->_query_end( $sql, $bind );
1835
1836   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1837 }
1838
1839 sub _prepare_sth {
1840   my ($self, $dbh, $sql) = @_;
1841
1842   # 3 is the if_active parameter which avoids active sth re-use
1843   my $sth = $self->disable_sth_caching
1844     ? $dbh->prepare($sql)
1845     : $dbh->prepare_cached($sql, {}, 3);
1846
1847   # XXX You would think RaiseError would make this impossible,
1848   #  but apparently that's not true :(
1849   $self->throw_exception(
1850     $dbh->errstr
1851       ||
1852     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
1853             .'an exception and/or setting $dbh->errstr',
1854       length ($sql) > 20
1855         ? substr($sql, 0, 20) . '...'
1856         : $sql
1857       ,
1858       'DBD::' . $dbh->{Driver}{Name},
1859     )
1860   ) if !$sth;
1861
1862   $sth;
1863 }
1864
1865 sub _bind_sth_params {
1866   my ($self, $sth, $bind, $bind_attrs) = @_;
1867
1868   for my $i (0 .. $#$bind) {
1869     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1870       $sth->bind_param_inout(
1871         $i + 1, # bind params counts are 1-based
1872         $bind->[$i][1],
1873         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1874         $bind_attrs->[$i],
1875       );
1876     }
1877     else {
1878       # FIXME SUBOPTIMAL - most likely this is not necessary at all
1879       # confirm with dbi-dev whether explicit stringification is needed
1880       my $v = ( length ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""') )
1881         ? "$bind->[$i][1]"
1882         : $bind->[$i][1]
1883       ;
1884       $sth->bind_param(
1885         $i + 1,
1886         $v,
1887         $bind_attrs->[$i],
1888       );
1889     }
1890   }
1891
1892   $sth;
1893 }
1894
1895 sub _prefetch_autovalues {
1896   my ($self, $source, $colinfo, $to_insert) = @_;
1897
1898   my %values;
1899   for my $col (keys %$colinfo) {
1900     if (
1901       $colinfo->{$col}{auto_nextval}
1902         and
1903       (
1904         ! exists $to_insert->{$col}
1905           or
1906         ref $to_insert->{$col} eq 'SCALAR'
1907           or
1908         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1909       )
1910     ) {
1911       $values{$col} = $self->_sequence_fetch(
1912         'NEXTVAL',
1913         ( $colinfo->{$col}{sequence} ||=
1914             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1915         ),
1916       );
1917     }
1918   }
1919
1920   \%values;
1921 }
1922
1923 sub insert {
1924   my ($self, $source, $to_insert) = @_;
1925
1926   my $col_infos = $source->columns_info;
1927
1928   my $prefetched_values = $self->_prefetch_autovalues($source, $col_infos, $to_insert);
1929
1930   # fuse the values, but keep a separate list of prefetched_values so that
1931   # they can be fused once again with the final return
1932   $to_insert = { %$to_insert, %$prefetched_values };
1933
1934   # FIXME - we seem to assume undef values as non-supplied. This is wrong.
1935   # Investigate what does it take to s/defined/exists/
1936   my %pcols = map { $_ => 1 } $source->primary_columns;
1937   my (%retrieve_cols, $autoinc_supplied, $retrieve_autoinc_col);
1938   for my $col ($source->columns) {
1939     if ($col_infos->{$col}{is_auto_increment}) {
1940       $autoinc_supplied ||= 1 if defined $to_insert->{$col};
1941       $retrieve_autoinc_col ||= $col unless $autoinc_supplied;
1942     }
1943
1944     # nothing to retrieve when explicit values are supplied
1945     next if (defined $to_insert->{$col} and ! (
1946       ref $to_insert->{$col} eq 'SCALAR'
1947         or
1948       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1949     ));
1950
1951     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1952     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1953       $pcols{$col}
1954         or
1955       $col_infos->{$col}{retrieve_on_insert}
1956     );
1957   };
1958
1959   local $self->{_autoinc_supplied_for_op} = $autoinc_supplied;
1960   local $self->{_perform_autoinc_retrieval} = $retrieve_autoinc_col;
1961
1962   my ($sqla_opts, @ir_container);
1963   if (%retrieve_cols and $self->_use_insert_returning) {
1964     $sqla_opts->{returning_container} = \@ir_container
1965       if $self->_use_insert_returning_bound;
1966
1967     $sqla_opts->{returning} = [
1968       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1969     ];
1970   }
1971
1972   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1973
1974   my %returned_cols = %$to_insert;
1975   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1976     @ir_container = try {
1977       local $SIG{__WARN__} = sub {};
1978       my @r = $sth->fetchrow_array;
1979       $sth->finish;
1980       @r;
1981     } unless @ir_container;
1982
1983     @returned_cols{@$retlist} = @ir_container if @ir_container;
1984   }
1985   else {
1986     # pull in PK if needed and then everything else
1987     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1988
1989       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1990         unless $self->can('last_insert_id');
1991
1992       my @pri_values = $self->last_insert_id($source, @missing_pri);
1993
1994       $self->throw_exception( "Can't get last insert id" )
1995         unless (@pri_values == @missing_pri);
1996
1997       @returned_cols{@missing_pri} = @pri_values;
1998       delete @retrieve_cols{@missing_pri};
1999     }
2000
2001     # if there is more left to pull
2002     if (%retrieve_cols) {
2003       $self->throw_exception(
2004         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
2005       ) unless %pcols;
2006
2007       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
2008
2009       my $cur = DBIx::Class::ResultSet->new($source, {
2010         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
2011         select => \@left_to_fetch,
2012       })->cursor;
2013
2014       @returned_cols{@left_to_fetch} = $cur->next;
2015
2016       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
2017         if scalar $cur->next;
2018     }
2019   }
2020
2021   return { %$prefetched_values, %returned_cols };
2022 }
2023
2024 sub insert_bulk {
2025   my ($self, $source, $cols, $data) = @_;
2026
2027   my @col_range = (0..$#$cols);
2028
2029   # FIXME SUBOPTIMAL - most likely this is not necessary at all
2030   # confirm with dbi-dev whether explicit stringification is needed
2031   #
2032   # forcibly stringify whatever is stringifiable
2033   # ResultSet::populate() hands us a copy - safe to mangle
2034   for my $r (0 .. $#$data) {
2035     for my $c (0 .. $#{$data->[$r]}) {
2036       $data->[$r][$c] = "$data->[$r][$c]"
2037         if ( length ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
2038     }
2039   }
2040
2041   my $colinfos = $source->columns_info($cols);
2042
2043   local $self->{_autoinc_supplied_for_op} =
2044     (first { $_->{is_auto_increment} } values %$colinfos)
2045       ? 1
2046       : 0
2047   ;
2048
2049   # get a slice type index based on first row of data
2050   # a "column" in this context may refer to more than one bind value
2051   # e.g. \[ '?, ?', [...], [...] ]
2052   #
2053   # construct the value type index - a description of values types for every
2054   # per-column slice of $data:
2055   #
2056   # nonexistent - nonbind literal
2057   # 0 - regular value
2058   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
2059   #
2060   # also construct the column hash to pass to the SQL generator. For plain
2061   # (non literal) values - convert the members of the first row into a
2062   # literal+bind combo, with extra positional info in the bind attr hashref.
2063   # This will allow us to match the order properly, and is so contrived
2064   # because a user-supplied literal/bind (or something else specific to a
2065   # resultsource and/or storage driver) can inject extra binds along the
2066   # way, so one can't rely on "shift positions" ordering at all. Also we
2067   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
2068   # can be later matched up by address), because we want to supply a real
2069   # value on which perhaps e.g. datatype checks will be performed
2070   my ($proto_data, $value_type_by_col_idx);
2071   for my $i (@col_range) {
2072     my $colname = $cols->[$i];
2073     if (ref $data->[0][$i] eq 'SCALAR') {
2074       # no bind value at all - no type
2075
2076       $proto_data->{$colname} = $data->[0][$i];
2077     }
2078     elsif (ref $data->[0][$i] eq 'REF' and ref ${$data->[0][$i]} eq 'ARRAY' ) {
2079       # repack, so we don't end up mangling the original \[]
2080       my ($sql, @bind) = @${$data->[0][$i]};
2081
2082       # normalization of user supplied stuff
2083       my $resolved_bind = $self->_resolve_bindattrs(
2084         $source, \@bind, $colinfos,
2085       );
2086
2087       # store value-less (attrs only) bind info - we will be comparing all
2088       # supplied binds against this for sanity
2089       $value_type_by_col_idx->{$i} = [ map { $_->[0] } @$resolved_bind ];
2090
2091       $proto_data->{$colname} = \[ $sql, map { [
2092         # inject slice order to use for $proto_bind construction
2093           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $i, _literal_bind_subindex => $_+1 }
2094             =>
2095           $resolved_bind->[$_][1]
2096         ] } (0 .. $#bind)
2097       ];
2098     }
2099     else {
2100       $value_type_by_col_idx->{$i} = undef;
2101
2102       $proto_data->{$colname} = \[ '?', [
2103         { dbic_colname => $colname, _bind_data_slice_idx => $i }
2104           =>
2105         $data->[0][$i]
2106       ] ];
2107     }
2108   }
2109
2110   my ($sql, $proto_bind) = $self->_prep_for_execute (
2111     'insert',
2112     $source,
2113     [ $proto_data ],
2114   );
2115
2116   if (! @$proto_bind and keys %$value_type_by_col_idx) {
2117     # if the bindlist is empty and we had some dynamic binds, this means the
2118     # storage ate them away (e.g. the NoBindVars component) and interpolated
2119     # them directly into the SQL. This obviously can't be good for multi-inserts
2120     $self->throw_exception('Cannot insert_bulk without support for placeholders');
2121   }
2122
2123   # sanity checks
2124   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
2125   #
2126   # use an error reporting closure for convenience (less to pass)
2127   my $bad_slice_report_cref = sub {
2128     my ($msg, $r_idx, $c_idx) = @_;
2129     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
2130       $msg,
2131       $cols->[$c_idx],
2132       do {
2133         require Data::Dumper::Concise;
2134         local $Data::Dumper::Maxdepth = 5;
2135         Data::Dumper::Concise::Dumper ({
2136           map { $cols->[$_] =>
2137             $data->[$r_idx][$_]
2138           } @col_range
2139         }),
2140       }
2141     );
2142   };
2143
2144   for my $col_idx (@col_range) {
2145     my $reference_val = $data->[0][$col_idx];
2146
2147     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
2148       my $val = $data->[$row_idx][$col_idx];
2149
2150       if (! exists $value_type_by_col_idx->{$col_idx}) { # literal no binds
2151         if (ref $val ne 'SCALAR') {
2152           $bad_slice_report_cref->(
2153             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
2154             $row_idx,
2155             $col_idx,
2156           );
2157         }
2158         elsif ($$val ne $$reference_val) {
2159           $bad_slice_report_cref->(
2160             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
2161             $row_idx,
2162             $col_idx,
2163           );
2164         }
2165       }
2166       elsif (! defined $value_type_by_col_idx->{$col_idx} ) {  # regular non-literal value
2167         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
2168           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
2169         }
2170       }
2171       else {  # binds from a \[], compare type and attrs
2172         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
2173           $bad_slice_report_cref->(
2174             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
2175             $row_idx,
2176             $col_idx,
2177           );
2178         }
2179         # start drilling down and bail out early on identical refs
2180         elsif (
2181           $reference_val != $val
2182             or
2183           $$reference_val != $$val
2184         ) {
2185           if (${$val}->[0] ne ${$reference_val}->[0]) {
2186             $bad_slice_report_cref->(
2187               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
2188               $row_idx,
2189               $col_idx,
2190             );
2191           }
2192           # need to check the bind attrs - a bind will happen only once for
2193           # the entire dataset, so any changes further down will be ignored.
2194           elsif (! Data::Compare::Compare(
2195             $value_type_by_col_idx->{$col_idx},
2196             [
2197               map
2198               { $_->[0] }
2199               @{$self->_resolve_bindattrs(
2200                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
2201               )}
2202             ],
2203           )) {
2204             $bad_slice_report_cref->(
2205               'Differing bind attributes on literal/bind values not supported',
2206               $row_idx,
2207               $col_idx,
2208             );
2209           }
2210         }
2211       }
2212     }
2213   }
2214
2215   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
2216   # are atomic (even if execute_for_fetch is a single call). Thus a safety
2217   # scope guard
2218   my $guard = $self->txn_scope_guard;
2219
2220   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
2221   my $sth = $self->_prepare_sth($self->_dbh, $sql);
2222   my $rv = do {
2223     if (@$proto_bind) {
2224       # proto bind contains the information on which pieces of $data to pull
2225       # $cols is passed in only for prettier error-reporting
2226       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
2227     }
2228     else {
2229       # bind_param_array doesn't work if there are no binds
2230       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
2231     }
2232   };
2233
2234   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
2235
2236   $guard->commit;
2237
2238   return wantarray ? ($rv, $sth, @$proto_bind) : $rv;
2239 }
2240
2241 # execute_for_fetch is capable of returning data just fine (it means it
2242 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
2243 # is the void-populate fast-path we will just ignore this altogether
2244 # for the time being.
2245 sub _dbh_execute_for_fetch {
2246   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
2247
2248   my @idx_range = ( 0 .. $#$proto_bind );
2249
2250   # If we have any bind attributes to take care of, we will bind the
2251   # proto-bind data (which will never be used by execute_for_fetch)
2252   # However since column bindtypes are "sticky", this is sufficient
2253   # to get the DBD to apply the bindtype to all values later on
2254
2255   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
2256
2257   for my $i (@idx_range) {
2258     $sth->bind_param (
2259       $i+1, # DBI bind indexes are 1-based
2260       $proto_bind->[$i][1],
2261       $bind_attrs->[$i],
2262     ) if defined $bind_attrs->[$i];
2263   }
2264
2265   # At this point $data slots named in the _bind_data_slice_idx of
2266   # each piece of $proto_bind are either \[]s or plain values to be
2267   # passed in. Construct the dispensing coderef. *NOTE* the order
2268   # of $data will differ from this of the ?s in the SQL (due to
2269   # alphabetical ordering by colname). We actually do want to
2270   # preserve this behavior so that prepare_cached has a better
2271   # chance of matching on unrelated calls
2272
2273   my $fetch_row_idx = -1; # saner loop this way
2274   my $fetch_tuple = sub {
2275     return undef if ++$fetch_row_idx > $#$data;
2276
2277     return [ map { defined $_->{_literal_bind_subindex}
2278       ? ${ $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]}
2279          ->[ $_->{_literal_bind_subindex} ]
2280           ->[1]
2281       : $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]
2282     } map { $_->[0] } @$proto_bind];
2283   };
2284
2285   my $tuple_status = [];
2286   my ($rv, $err);
2287   try {
2288     $rv = $sth->execute_for_fetch(
2289       $fetch_tuple,
2290       $tuple_status,
2291     );
2292   }
2293   catch {
2294     $err = shift;
2295   };
2296
2297   # Not all DBDs are create equal. Some throw on error, some return
2298   # an undef $rv, and some set $sth->err - try whatever we can
2299   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2300     ! defined $err
2301       and
2302     ( !defined $rv or $sth->err )
2303   );
2304
2305   # Statement must finish even if there was an exception.
2306   try {
2307     $sth->finish
2308   }
2309   catch {
2310     $err = shift unless defined $err
2311   };
2312
2313   if (defined $err) {
2314     my $i = 0;
2315     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2316
2317     $self->throw_exception("Unexpected populate error: $err")
2318       if ($i > $#$tuple_status);
2319
2320     require Data::Dumper::Concise;
2321     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2322       ($tuple_status->[$i][1] || $err),
2323       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2324     );
2325   }
2326
2327   return $rv;
2328 }
2329
2330 sub _dbh_execute_inserts_with_no_binds {
2331   my ($self, $sth, $count) = @_;
2332
2333   my $err;
2334   try {
2335     my $dbh = $self->_get_dbh;
2336     local $dbh->{RaiseError} = 1;
2337     local $dbh->{PrintError} = 0;
2338
2339     $sth->execute foreach 1..$count;
2340   }
2341   catch {
2342     $err = shift;
2343   };
2344
2345   # Make sure statement is finished even if there was an exception.
2346   try {
2347     $sth->finish
2348   }
2349   catch {
2350     $err = shift unless defined $err;
2351   };
2352
2353   $self->throw_exception($err) if defined $err;
2354
2355   return $count;
2356 }
2357
2358 sub update {
2359   #my ($self, $source, @args) = @_;
2360   shift->_execute('update', @_);
2361 }
2362
2363
2364 sub delete {
2365   #my ($self, $source, @args) = @_;
2366   shift->_execute('delete', @_);
2367 }
2368
2369 sub _select {
2370   my $self = shift;
2371   $self->_execute($self->_select_args(@_));
2372 }
2373
2374 sub _select_args_to_query {
2375   my $self = shift;
2376
2377   $self->throw_exception(
2378     "Unable to generate limited query representation with 'software_limit' enabled"
2379   ) if ($_[3]->{software_limit} and ($_[3]->{offset} or $_[3]->{rows}) );
2380
2381   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2382   #  = $self->_select_args($ident, $select, $cond, $attrs);
2383   my ($op, $ident, @args) =
2384     $self->_select_args(@_);
2385
2386   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2387   my ($sql, $bind) = $self->_gen_sql_bind($op, $ident, \@args);
2388
2389   # reuse the bind arrayref
2390   unshift @{$bind}, "($sql)";
2391   \$bind;
2392 }
2393
2394 sub _select_args {
2395   my ($self, $ident, $select, $where, $orig_attrs) = @_;
2396
2397   # FIXME - that kind of caching would be nice to have
2398   # however currently we *may* pass the same $orig_attrs
2399   # with different ident/select/where
2400   # the whole interface needs to be rethought, since it
2401   # was centered around the flawed SQLA API. We can do
2402   # soooooo much better now. But that is also another
2403   # battle...
2404   #return (
2405   #  'select', @{$orig_attrs->{_sqlmaker_select_args}}
2406   #) if $orig_attrs->{_sqlmaker_select_args};
2407
2408   my $sql_maker = $self->sql_maker;
2409   my $alias2source = $self->_resolve_ident_sources ($ident);
2410
2411   my $attrs = {
2412     %$orig_attrs,
2413     select => $select,
2414     from => $ident,
2415     where => $where,
2416
2417     # limit dialects use this stuff
2418     # yes, some CDBICompat crap does not supply an {alias} >.<
2419     ( $orig_attrs->{alias} and $alias2source->{$orig_attrs->{alias}} )
2420       ? ( _rsroot_rsrc => $alias2source->{$orig_attrs->{alias}} )
2421       : ()
2422     ,
2423   };
2424
2425   # Sanity check the attributes (SQLMaker does it too, but
2426   # in case of a software_limit we'll never reach there)
2427   if (defined $attrs->{offset}) {
2428     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2429       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2430   }
2431
2432   if (defined $attrs->{rows}) {
2433     $self->throw_exception("The rows attribute must be a positive integer if present")
2434       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2435   }
2436   elsif ($attrs->{offset}) {
2437     # MySQL actually recommends this approach.  I cringe.
2438     $attrs->{rows} = $sql_maker->__max_int;
2439   }
2440
2441   # see if we will need to tear the prefetch apart to satisfy group_by == select
2442   # this is *extremely tricky* to get right, I am still not sure I did
2443   #
2444   my ($prefetch_needs_subquery, @limit_args);
2445
2446   if ( $attrs->{_grouped_by_distinct} and $attrs->{collapse} ) {
2447     # we already know there is a valid group_by (we made it) and we know it is
2448     # intended to be based *only* on non-multi stuff
2449     # short circuit the group_by parsing below
2450     $prefetch_needs_subquery = 1;
2451   }
2452   elsif (
2453     # The rationale is that even if we do *not* have collapse, we still
2454     # need to wrap the core grouped select/group_by in a subquery
2455     # so that databases that care about group_by/select equivalence
2456     # are happy (this includes MySQL in strict_mode)
2457     # If any of the other joined tables are referenced in the group_by
2458     # however - the user is on their own
2459     ( $prefetch_needs_subquery or $attrs->{_related_results_construction} )
2460       and
2461     $attrs->{group_by}
2462       and
2463     @{$attrs->{group_by}}
2464       and
2465     my $grp_aliases = try { # try{} because $attrs->{from} may be unreadable
2466       $self->_resolve_aliastypes_from_select_args({ from => $attrs->{from}, group_by => $attrs->{group_by} })
2467     }
2468   ) {
2469     # no aliases other than our own in group_by
2470     # if there are - do not allow subquery even if limit is present
2471     $prefetch_needs_subquery = ! scalar grep { $_ ne $attrs->{alias} } keys %{ $grp_aliases->{grouping} || {} };
2472   }
2473   elsif ( $attrs->{rows} && $attrs->{collapse} ) {
2474     # active collapse with a limit - that one is a no-brainer unless
2475     # overruled by a group_by above
2476     $prefetch_needs_subquery = 1;
2477   }
2478
2479   if ($prefetch_needs_subquery) {
2480     $attrs = $self->_adjust_select_args_for_complex_prefetch ($attrs);
2481   }
2482   elsif (! $attrs->{software_limit} ) {
2483     push @limit_args, (
2484       $attrs->{rows} || (),
2485       $attrs->{offset} || (),
2486     );
2487   }
2488
2489   # try to simplify the joinmap further (prune unreferenced type-single joins)
2490   if (
2491     ! $prefetch_needs_subquery  # already pruned
2492       and
2493     ref $attrs->{from}
2494       and
2495     reftype $attrs->{from} eq 'ARRAY'
2496       and
2497     @{$attrs->{from}} != 1
2498   ) {
2499     ($attrs->{from}, $attrs->{_aliastypes}) = $self->_prune_unused_joins ($attrs);
2500   }
2501
2502 ###
2503   # This would be the point to deflate anything found in $attrs->{where}
2504   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2505   # expect a result object. And all we have is a resultsource (it is trivial
2506   # to extract deflator coderefs via $alias2source above).
2507   #
2508   # I don't see a way forward other than changing the way deflators are
2509   # invoked, and that's just bad...
2510 ###
2511
2512   return ( 'select', @{ $orig_attrs->{_sqlmaker_select_args} = [
2513     @{$attrs}{qw(from select where)}, $attrs, @limit_args
2514   ]} );
2515 }
2516
2517 # Returns a counting SELECT for a simple count
2518 # query. Abstracted so that a storage could override
2519 # this to { count => 'firstcol' } or whatever makes
2520 # sense as a performance optimization
2521 sub _count_select {
2522   #my ($self, $source, $rs_attrs) = @_;
2523   return { count => '*' };
2524 }
2525
2526 =head2 select
2527
2528 =over 4
2529
2530 =item Arguments: $ident, $select, $condition, $attrs
2531
2532 =back
2533
2534 Handle a SQL select statement.
2535
2536 =cut
2537
2538 sub select {
2539   my $self = shift;
2540   my ($ident, $select, $condition, $attrs) = @_;
2541   return $self->cursor_class->new($self, \@_, $attrs);
2542 }
2543
2544 sub select_single {
2545   my $self = shift;
2546   my ($rv, $sth, @bind) = $self->_select(@_);
2547   my @row = $sth->fetchrow_array;
2548   my @nextrow = $sth->fetchrow_array if @row;
2549   if(@row && @nextrow) {
2550     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2551   }
2552   # Need to call finish() to work round broken DBDs
2553   $sth->finish();
2554   return @row;
2555 }
2556
2557 =head2 sql_limit_dialect
2558
2559 This is an accessor for the default SQL limit dialect used by a particular
2560 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2561 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2562 see L<DBIx::Class::SQLMaker::LimitDialects>.
2563
2564 =cut
2565
2566 sub _dbh_columns_info_for {
2567   my ($self, $dbh, $table) = @_;
2568
2569   if ($dbh->can('column_info')) {
2570     my %result;
2571     my $caught;
2572     try {
2573       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2574       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2575       $sth->execute();
2576       while ( my $info = $sth->fetchrow_hashref() ){
2577         my %column_info;
2578         $column_info{data_type}   = $info->{TYPE_NAME};
2579         $column_info{size}      = $info->{COLUMN_SIZE};
2580         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2581         $column_info{default_value} = $info->{COLUMN_DEF};
2582         my $col_name = $info->{COLUMN_NAME};
2583         $col_name =~ s/^\"(.*)\"$/$1/;
2584
2585         $result{$col_name} = \%column_info;
2586       }
2587     } catch {
2588       $caught = 1;
2589     };
2590     return \%result if !$caught && scalar keys %result;
2591   }
2592
2593   my %result;
2594   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2595   $sth->execute;
2596   my @columns = @{$sth->{NAME_lc}};
2597   for my $i ( 0 .. $#columns ){
2598     my %column_info;
2599     $column_info{data_type} = $sth->{TYPE}->[$i];
2600     $column_info{size} = $sth->{PRECISION}->[$i];
2601     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2602
2603     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2604       $column_info{data_type} = $1;
2605       $column_info{size}    = $2;
2606     }
2607
2608     $result{$columns[$i]} = \%column_info;
2609   }
2610   $sth->finish;
2611
2612   foreach my $col (keys %result) {
2613     my $colinfo = $result{$col};
2614     my $type_num = $colinfo->{data_type};
2615     my $type_name;
2616     if(defined $type_num && $dbh->can('type_info')) {
2617       my $type_info = $dbh->type_info($type_num);
2618       $type_name = $type_info->{TYPE_NAME} if $type_info;
2619       $colinfo->{data_type} = $type_name if $type_name;
2620     }
2621   }
2622
2623   return \%result;
2624 }
2625
2626 sub columns_info_for {
2627   my ($self, $table) = @_;
2628   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2629 }
2630
2631 =head2 last_insert_id
2632
2633 Return the row id of the last insert.
2634
2635 =cut
2636
2637 sub _dbh_last_insert_id {
2638     my ($self, $dbh, $source, $col) = @_;
2639
2640     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2641
2642     return $id if defined $id;
2643
2644     my $class = ref $self;
2645     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2646 }
2647
2648 sub last_insert_id {
2649   my $self = shift;
2650   $self->_dbh_last_insert_id ($self->_dbh, @_);
2651 }
2652
2653 =head2 _native_data_type
2654
2655 =over 4
2656
2657 =item Arguments: $type_name
2658
2659 =back
2660
2661 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2662 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2663 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2664
2665 The default implementation returns C<undef>, implement in your Storage driver if
2666 you need this functionality.
2667
2668 Should map types from other databases to the native RDBMS type, for example
2669 C<VARCHAR2> to C<VARCHAR>.
2670
2671 Types with modifiers should map to the underlying data type. For example,
2672 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2673
2674 Composite types should map to the container type, for example
2675 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2676
2677 =cut
2678
2679 sub _native_data_type {
2680   #my ($self, $data_type) = @_;
2681   return undef
2682 }
2683
2684 # Check if placeholders are supported at all
2685 sub _determine_supports_placeholders {
2686   my $self = shift;
2687   my $dbh  = $self->_get_dbh;
2688
2689   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2690   # but it is inaccurate more often than not
2691   return try {
2692     local $dbh->{PrintError} = 0;
2693     local $dbh->{RaiseError} = 1;
2694     $dbh->do('select ?', {}, 1);
2695     1;
2696   }
2697   catch {
2698     0;
2699   };
2700 }
2701
2702 # Check if placeholders bound to non-string types throw exceptions
2703 #
2704 sub _determine_supports_typeless_placeholders {
2705   my $self = shift;
2706   my $dbh  = $self->_get_dbh;
2707
2708   return try {
2709     local $dbh->{PrintError} = 0;
2710     local $dbh->{RaiseError} = 1;
2711     # this specifically tests a bind that is NOT a string
2712     $dbh->do('select 1 where 1 = ?', {}, 1);
2713     1;
2714   }
2715   catch {
2716     0;
2717   };
2718 }
2719
2720 =head2 sqlt_type
2721
2722 Returns the database driver name.
2723
2724 =cut
2725
2726 sub sqlt_type {
2727   shift->_get_dbh->{Driver}->{Name};
2728 }
2729
2730 =head2 bind_attribute_by_data_type
2731
2732 Given a datatype from column info, returns a database specific bind
2733 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2734 let the database planner just handle it.
2735
2736 This method is always called after the driver has been determined and a DBI
2737 connection has been established. Therefore you can refer to C<DBI::$constant>
2738 and/or C<DBD::$driver::$constant> directly, without worrying about loading
2739 the correct modules.
2740
2741 =cut
2742
2743 sub bind_attribute_by_data_type {
2744     return;
2745 }
2746
2747 =head2 is_datatype_numeric
2748
2749 Given a datatype from column_info, returns a boolean value indicating if
2750 the current RDBMS considers it a numeric value. This controls how
2751 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2752 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2753 be performed instead of the usual C<eq>.
2754
2755 =cut
2756
2757 sub is_datatype_numeric {
2758   #my ($self, $dt) = @_;
2759
2760   return 0 unless $_[1];
2761
2762   $_[1] =~ /^ (?:
2763     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2764   ) $/ix;
2765 }
2766
2767
2768 =head2 create_ddl_dir
2769
2770 =over 4
2771
2772 =item Arguments: $schema, \@databases, $version, $directory, $preversion, \%sqlt_args
2773
2774 =back
2775
2776 Creates a SQL file based on the Schema, for each of the specified
2777 database engines in C<\@databases> in the given directory.
2778 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2779
2780 Given a previous version number, this will also create a file containing
2781 the ALTER TABLE statements to transform the previous schema into the
2782 current one. Note that these statements may contain C<DROP TABLE> or
2783 C<DROP COLUMN> statements that can potentially destroy data.
2784
2785 The file names are created using the C<ddl_filename> method below, please
2786 override this method in your schema if you would like a different file
2787 name format. For the ALTER file, the same format is used, replacing
2788 $version in the name with "$preversion-$version".
2789
2790 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2791 The most common value for this would be C<< { add_drop_table => 1 } >>
2792 to have the SQL produced include a C<DROP TABLE> statement for each table
2793 created. For quoting purposes supply C<quote_identifiers>.
2794
2795 If no arguments are passed, then the following default values are assumed:
2796
2797 =over 4
2798
2799 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2800
2801 =item version    - $schema->schema_version
2802
2803 =item directory  - './'
2804
2805 =item preversion - <none>
2806
2807 =back
2808
2809 By default, C<\%sqlt_args> will have
2810
2811  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2812
2813 merged with the hash passed in. To disable any of those features, pass in a
2814 hashref like the following
2815
2816  { ignore_constraint_names => 0, # ... other options }
2817
2818
2819 WARNING: You are strongly advised to check all SQL files created, before applying
2820 them.
2821
2822 =cut
2823
2824 sub create_ddl_dir {
2825   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2826
2827   unless ($dir) {
2828     carp "No directory given, using ./\n";
2829     $dir = './';
2830   } else {
2831       -d $dir
2832         or
2833       (require File::Path and File::Path::mkpath (["$dir"]))  # mkpath does not like objects (i.e. Path::Class::Dir)
2834         or
2835       $self->throw_exception(
2836         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2837       );
2838   }
2839
2840   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2841
2842   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2843   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2844
2845   my $schema_version = $schema->schema_version || '1.x';
2846   $version ||= $schema_version;
2847
2848   $sqltargs = {
2849     add_drop_table => 1,
2850     ignore_constraint_names => 1,
2851     ignore_index_names => 1,
2852     %{$sqltargs || {}}
2853   };
2854
2855   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2856     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2857   }
2858
2859   my $sqlt = SQL::Translator->new( $sqltargs );
2860
2861   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2862   my $sqlt_schema = $sqlt->translate({ data => $schema })
2863     or $self->throw_exception ($sqlt->error);
2864
2865   foreach my $db (@$databases) {
2866     $sqlt->reset();
2867     $sqlt->{schema} = $sqlt_schema;
2868     $sqlt->producer($db);
2869
2870     my $file;
2871     my $filename = $schema->ddl_filename($db, $version, $dir);
2872     if (-e $filename && ($version eq $schema_version )) {
2873       # if we are dumping the current version, overwrite the DDL
2874       carp "Overwriting existing DDL file - $filename";
2875       unlink($filename);
2876     }
2877
2878     my $output = $sqlt->translate;
2879     if(!$output) {
2880       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2881       next;
2882     }
2883     if(!open($file, ">$filename")) {
2884       $self->throw_exception("Can't open $filename for writing ($!)");
2885       next;
2886     }
2887     print $file $output;
2888     close($file);
2889
2890     next unless ($preversion);
2891
2892     require SQL::Translator::Diff;
2893
2894     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2895     if(!-e $prefilename) {
2896       carp("No previous schema file found ($prefilename)");
2897       next;
2898     }
2899
2900     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2901     if(-e $difffile) {
2902       carp("Overwriting existing diff file - $difffile");
2903       unlink($difffile);
2904     }
2905
2906     my $source_schema;
2907     {
2908       my $t = SQL::Translator->new($sqltargs);
2909       $t->debug( 0 );
2910       $t->trace( 0 );
2911
2912       $t->parser( $db )
2913         or $self->throw_exception ($t->error);
2914
2915       my $out = $t->translate( $prefilename )
2916         or $self->throw_exception ($t->error);
2917
2918       $source_schema = $t->schema;
2919
2920       $source_schema->name( $prefilename )
2921         unless ( $source_schema->name );
2922     }
2923
2924     # The "new" style of producers have sane normalization and can support
2925     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2926     # And we have to diff parsed SQL against parsed SQL.
2927     my $dest_schema = $sqlt_schema;
2928
2929     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2930       my $t = SQL::Translator->new($sqltargs);
2931       $t->debug( 0 );
2932       $t->trace( 0 );
2933
2934       $t->parser( $db )
2935         or $self->throw_exception ($t->error);
2936
2937       my $out = $t->translate( $filename )
2938         or $self->throw_exception ($t->error);
2939
2940       $dest_schema = $t->schema;
2941
2942       $dest_schema->name( $filename )
2943         unless $dest_schema->name;
2944     }
2945
2946     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2947                                                   $dest_schema,   $db,
2948                                                   $sqltargs
2949                                                  );
2950     if(!open $file, ">$difffile") {
2951       $self->throw_exception("Can't write to $difffile ($!)");
2952       next;
2953     }
2954     print $file $diff;
2955     close($file);
2956   }
2957 }
2958
2959 =head2 deployment_statements
2960
2961 =over 4
2962
2963 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2964
2965 =back
2966
2967 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2968
2969 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2970 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2971
2972 C<$directory> is used to return statements from files in a previously created
2973 L</create_ddl_dir> directory and is optional. The filenames are constructed
2974 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2975
2976 If no C<$directory> is specified then the statements are constructed on the
2977 fly using L<SQL::Translator> and C<$version> is ignored.
2978
2979 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2980
2981 =cut
2982
2983 sub deployment_statements {
2984   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2985   $type ||= $self->sqlt_type;
2986   $version ||= $schema->schema_version || '1.x';
2987   $dir ||= './';
2988   my $filename = $schema->ddl_filename($type, $version, $dir);
2989   if(-f $filename)
2990   {
2991       # FIXME replace this block when a proper sane sql parser is available
2992       my $file;
2993       open($file, "<$filename")
2994         or $self->throw_exception("Can't open $filename ($!)");
2995       my @rows = <$file>;
2996       close($file);
2997       return join('', @rows);
2998   }
2999
3000   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
3001     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
3002   }
3003
3004   # sources needs to be a parser arg, but for simplicity allow at top level
3005   # coming in
3006   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
3007       if exists $sqltargs->{sources};
3008
3009   my $tr = SQL::Translator->new(
3010     producer => "SQL::Translator::Producer::${type}",
3011     %$sqltargs,
3012     parser => 'SQL::Translator::Parser::DBIx::Class',
3013     data => $schema,
3014   );
3015
3016   return preserve_context {
3017     $tr->translate
3018   } after => sub {
3019     $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
3020       unless defined $_[0];
3021   };
3022 }
3023
3024 # FIXME deploy() currently does not accurately report sql errors
3025 # Will always return true while errors are warned
3026 sub deploy {
3027   my ($self, $schema, $type, $sqltargs, $dir) = @_;
3028   my $deploy = sub {
3029     my $line = shift;
3030     return if(!$line);
3031     return if($line =~ /^--/);
3032     # next if($line =~ /^DROP/m);
3033     return if($line =~ /^BEGIN TRANSACTION/m);
3034     return if($line =~ /^COMMIT/m);
3035     return if $line =~ /^\s+$/; # skip whitespace only
3036     $self->_query_start($line);
3037     try {
3038       # do a dbh_do cycle here, as we need some error checking in
3039       # place (even though we will ignore errors)
3040       $self->dbh_do (sub { $_[1]->do($line) });
3041     } catch {
3042       carp qq{$_ (running "${line}")};
3043     };
3044     $self->_query_end($line);
3045   };
3046   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
3047   if (@statements > 1) {
3048     foreach my $statement (@statements) {
3049       $deploy->( $statement );
3050     }
3051   }
3052   elsif (@statements == 1) {
3053     # split on single line comments and end of statements
3054     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
3055       $deploy->( $line );
3056     }
3057   }
3058 }
3059
3060 =head2 datetime_parser
3061
3062 Returns the datetime parser class
3063
3064 =cut
3065
3066 sub datetime_parser {
3067   my $self = shift;
3068   return $self->{datetime_parser} ||= do {
3069     $self->build_datetime_parser(@_);
3070   };
3071 }
3072
3073 =head2 datetime_parser_type
3074
3075 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
3076
3077 =head2 build_datetime_parser
3078
3079 See L</datetime_parser>
3080
3081 =cut
3082
3083 sub build_datetime_parser {
3084   my $self = shift;
3085   my $type = $self->datetime_parser_type(@_);
3086   return $type;
3087 }
3088
3089
3090 =head2 is_replicating
3091
3092 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
3093 replicate from a master database.  Default is undef, which is the result
3094 returned by databases that don't support replication.
3095
3096 =cut
3097
3098 sub is_replicating {
3099     return;
3100
3101 }
3102
3103 =head2 lag_behind_master
3104
3105 Returns a number that represents a certain amount of lag behind a master db
3106 when a given storage is replicating.  The number is database dependent, but
3107 starts at zero and increases with the amount of lag. Default in undef
3108
3109 =cut
3110
3111 sub lag_behind_master {
3112     return;
3113 }
3114
3115 =head2 relname_to_table_alias
3116
3117 =over 4
3118
3119 =item Arguments: $relname, $join_count
3120
3121 =item Return Value: $alias
3122
3123 =back
3124
3125 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
3126 queries.
3127
3128 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
3129 way these aliases are named.
3130
3131 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
3132 otherwise C<"$relname">.
3133
3134 =cut
3135
3136 sub relname_to_table_alias {
3137   my ($self, $relname, $join_count) = @_;
3138
3139   my $alias = ($join_count && $join_count > 1 ?
3140     join('_', $relname, $join_count) : $relname);
3141
3142   return $alias;
3143 }
3144
3145 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
3146 # version and it may be necessary to amend or override it for a specific storage
3147 # if such binds are necessary.
3148 sub _max_column_bytesize {
3149   my ($self, $attr) = @_;
3150
3151   my $max_size;
3152
3153   if ($attr->{sqlt_datatype}) {
3154     my $data_type = lc($attr->{sqlt_datatype});
3155
3156     if ($attr->{sqlt_size}) {
3157
3158       # String/sized-binary types
3159       if ($data_type =~ /^(?:
3160           l? (?:var)? char(?:acter)? (?:\s*varying)?
3161             |
3162           (?:var)? binary (?:\s*varying)?
3163             |
3164           raw
3165         )\b/x
3166       ) {
3167         $max_size = $attr->{sqlt_size};
3168       }
3169       # Other charset/unicode types, assume scale of 4
3170       elsif ($data_type =~ /^(?:
3171           national \s* character (?:\s*varying)?
3172             |
3173           nchar
3174             |
3175           univarchar
3176             |
3177           nvarchar
3178         )\b/x
3179       ) {
3180         $max_size = $attr->{sqlt_size} * 4;
3181       }
3182     }
3183
3184     if (!$max_size and !$self->_is_lob_type($data_type)) {
3185       $max_size = 100 # for all other (numeric?) datatypes
3186     }
3187   }
3188
3189   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
3190 }
3191
3192 # Determine if a data_type is some type of BLOB
3193 sub _is_lob_type {
3194   my ($self, $data_type) = @_;
3195   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
3196     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
3197                                   |varchar|character\s*varying|nvarchar
3198                                   |national\s*character\s*varying))?\z/xi);
3199 }
3200
3201 sub _is_binary_lob_type {
3202   my ($self, $data_type) = @_;
3203   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
3204     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
3205 }
3206
3207 sub _is_text_lob_type {
3208   my ($self, $data_type) = @_;
3209   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
3210     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
3211                         |national\s*character\s*varying))\z/xi);
3212 }
3213
3214 # Determine if a data_type is some type of a binary type
3215 sub _is_binary_type {
3216   my ($self, $data_type) = @_;
3217   $data_type && ($self->_is_binary_lob_type($data_type)
3218     || $data_type =~ /(?:var)?(?:binary|bit|graphic)(?:\s*varying)?/i);
3219 }
3220
3221 1;
3222
3223 =head1 USAGE NOTES
3224
3225 =head2 DBIx::Class and AutoCommit
3226
3227 DBIx::Class can do some wonderful magic with handling exceptions,
3228 disconnections, and transactions when you use C<< AutoCommit => 1 >>
3229 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
3230 transaction support.
3231
3232 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
3233 in an assumed transaction between commits, and you're telling us you'd
3234 like to manage that manually.  A lot of the magic protections offered by
3235 this module will go away.  We can't protect you from exceptions due to database
3236 disconnects because we don't know anything about how to restart your
3237 transactions.  You're on your own for handling all sorts of exceptional
3238 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
3239 be with raw DBI.
3240
3241
3242 =head1 AUTHOR AND CONTRIBUTORS
3243
3244 See L<AUTHOR|DBIx::Class/AUTHOR> and L<CONTRIBUTORS|DBIx::Class/CONTRIBUTORS> in DBIx::Class
3245
3246 =head1 LICENSE
3247
3248 You may distribute this code under the same terms as Perl itself.
3249
3250 =cut