More robust behavior under heavily threaded environments
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Context::Preserve 'preserve_context';
16 use Try::Tiny;
17 use overload ();
18 use Data::Compare (); # no imports!!! guard against insane architecture
19 use DBI::Const::GetInfoType (); # no import of retarded global hash
20 use namespace::clean;
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/
26   sql_limit_dialect sql_quote_char sql_name_sep
27 /);
28
29 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
30
31 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
32 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
33
34 __PACKAGE__->sql_name_sep('.');
35
36 __PACKAGE__->mk_group_accessors('simple' => qw/
37   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
38   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
39   _perform_autoinc_retrieval _autoinc_supplied_for_op
40 /);
41
42 # the values for these accessors are picked out (and deleted) from
43 # the attribute hashref passed to connect_info
44 my @storage_options = qw/
45   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
46   disable_sth_caching unsafe auto_savepoint
47 /;
48 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
49
50
51 # capability definitions, using a 2-tiered accessor system
52 # The rationale is:
53 #
54 # A driver/user may define _use_X, which blindly without any checks says:
55 # "(do not) use this capability", (use_dbms_capability is an "inherited"
56 # type accessor)
57 #
58 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
59 # accessor, which in turn calls _determine_supports_X, and stores the return
60 # in a special slot on the storage object, which is wiped every time a $dbh
61 # reconnection takes place (it is not guaranteed that upon reconnection we
62 # will get the same rdbms version). _determine_supports_X does not need to
63 # exist on a driver, as we ->can for it before calling.
64
65 my @capabilities = (qw/
66   insert_returning
67   insert_returning_bound
68
69   multicolumn_in
70
71   placeholders
72   typeless_placeholders
73
74   join_optimizer
75 /);
76 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
77 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
78
79 # on by default, not strictly a capability (pending rewrite)
80 __PACKAGE__->_use_join_optimizer (1);
81 sub _determine_supports_join_optimizer { 1 };
82
83 # Each of these methods need _determine_driver called before itself
84 # in order to function reliably. This is a purely DRY optimization
85 #
86 # get_(use)_dbms_capability need to be called on the correct Storage
87 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
88 # _determine_supports_X which obv. needs a correct driver as well
89 my @rdbms_specific_methods = qw/
90   deployment_statements
91   sqlt_type
92   sql_maker
93   build_datetime_parser
94   datetime_parser_type
95
96   txn_begin
97   insert
98   insert_bulk
99   update
100   delete
101   select
102   select_single
103   with_deferred_fk_checks
104
105   get_use_dbms_capability
106   get_dbms_capability
107
108   _server_info
109   _get_server_version
110 /;
111
112 for my $meth (@rdbms_specific_methods) {
113
114   my $orig = __PACKAGE__->can ($meth)
115     or die "$meth is not a ::Storage::DBI method!";
116
117   no strict qw/refs/;
118   no warnings qw/redefine/;
119   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
120     if (
121       # only fire when invoked on an instance, a valid class-based invocation
122       # would e.g. be setting a default for an inherited accessor
123       ref $_[0]
124         and
125       ! $_[0]->_driver_determined
126         and
127       ! $_[0]->{_in_determine_driver}
128     ) {
129       $_[0]->_determine_driver;
130
131       # This for some reason crashes and burns on perl 5.8.1
132       # IFF the method ends up throwing an exception
133       #goto $_[0]->can ($meth);
134
135       my $cref = $_[0]->can ($meth);
136       goto $cref;
137     }
138
139     goto $orig;
140   };
141 }
142
143 =head1 NAME
144
145 DBIx::Class::Storage::DBI - DBI storage handler
146
147 =head1 SYNOPSIS
148
149   my $schema = MySchema->connect('dbi:SQLite:my.db');
150
151   $schema->storage->debug(1);
152
153   my @stuff = $schema->storage->dbh_do(
154     sub {
155       my ($storage, $dbh, @args) = @_;
156       $dbh->do("DROP TABLE authors");
157     },
158     @column_list
159   );
160
161   $schema->resultset('Book')->search({
162      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
163   });
164
165 =head1 DESCRIPTION
166
167 This class represents the connection to an RDBMS via L<DBI>.  See
168 L<DBIx::Class::Storage> for general information.  This pod only
169 documents DBI-specific methods and behaviors.
170
171 =head1 METHODS
172
173 =cut
174
175 sub new {
176   my $new = shift->next::method(@_);
177
178   $new->_sql_maker_opts({});
179   $new->_dbh_details({});
180   $new->{_in_do_block} = 0;
181   $new->{_dbh_gen} = 0;
182
183   # read below to see what this does
184   $new->_arm_global_destructor;
185
186   $new;
187 }
188
189 # This is hack to work around perl shooting stuff in random
190 # order on exit(). If we do not walk the remaining storage
191 # objects in an END block, there is a *small but real* chance
192 # of a fork()ed child to kill the parent's shared DBI handle,
193 # *before perl reaches the DESTROY in this package*
194 # Yes, it is ugly and effective.
195 # Additionally this registry is used by the CLONE method to
196 # make sure no handles are shared between threads
197 {
198   my %seek_and_destroy;
199
200   sub _arm_global_destructor {
201     weaken (
202       $seek_and_destroy{ refaddr($_[0]) } = $_[0]
203     );
204   }
205
206   END {
207     local $?; # just in case the DBI destructor changes it somehow
208
209     # destroy just the object if not native to this process
210     $_->_verify_pid for (grep
211       { defined $_ }
212       values %seek_and_destroy
213     );
214   }
215
216   sub CLONE {
217     # As per DBI's recommendation, DBIC disconnects all handles as
218     # soon as possible (DBIC will reconnect only on demand from within
219     # the thread)
220     my @instances = grep { defined $_ } values %seek_and_destroy;
221     for (@instances) {
222       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
223       $_->_dbh(undef);
224
225       $_->transaction_depth(0);
226       $_->savepoints([]);
227     }
228
229     # properly renumber all existing refs
230     %seek_and_destroy = ();
231     $_->_arm_global_destructor for @instances;
232   }
233 }
234
235 sub DESTROY {
236   my $self = shift;
237
238   # some databases spew warnings on implicit disconnect
239   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
240   local $SIG{__WARN__} = sub {};
241   $self->_dbh(undef);
242
243   # this op is necessary, since the very last perl runtime statement
244   # triggers a global destruction shootout, and the $SIG localization
245   # may very well be destroyed before perl actually gets to do the
246   # $dbh undef
247   1;
248 }
249
250 # handle pid changes correctly - do not destroy parent's connection
251 sub _verify_pid {
252   my $self = shift;
253
254   my $pid = $self->_conn_pid;
255   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
256     $dbh->{InactiveDestroy} = 1;
257     $self->{_dbh_gen}++;
258     $self->_dbh(undef);
259     $self->transaction_depth(0);
260     $self->savepoints([]);
261   }
262
263   return;
264 }
265
266 =head2 connect_info
267
268 This method is normally called by L<DBIx::Class::Schema/connection>, which
269 encapsulates its argument list in an arrayref before passing them here.
270
271 The argument list may contain:
272
273 =over
274
275 =item *
276
277 The same 4-element argument set one would normally pass to
278 L<DBI/connect>, optionally followed by
279 L<extra attributes|/DBIx::Class specific connection attributes>
280 recognized by DBIx::Class:
281
282   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
283
284 =item *
285
286 A single code reference which returns a connected
287 L<DBI database handle|DBI/connect> optionally followed by
288 L<extra attributes|/DBIx::Class specific connection attributes> recognized
289 by DBIx::Class:
290
291   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
292
293 =item *
294
295 A single hashref with all the attributes and the dsn/user/password
296 mixed together:
297
298   $connect_info_args = [{
299     dsn => $dsn,
300     user => $user,
301     password => $pass,
302     %dbi_attributes,
303     %extra_attributes,
304   }];
305
306   $connect_info_args = [{
307     dbh_maker => sub { DBI->connect (...) },
308     %dbi_attributes,
309     %extra_attributes,
310   }];
311
312 This is particularly useful for L<Catalyst> based applications, allowing the
313 following config (L<Config::General> style):
314
315   <Model::DB>
316     schema_class   App::DB
317     <connect_info>
318       dsn          dbi:mysql:database=test
319       user         testuser
320       password     TestPass
321       AutoCommit   1
322     </connect_info>
323   </Model::DB>
324
325 The C<dsn>/C<user>/C<password> combination can be substituted by the
326 C<dbh_maker> key whose value is a coderef that returns a connected
327 L<DBI database handle|DBI/connect>
328
329 =back
330
331 Please note that the L<DBI> docs recommend that you always explicitly
332 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
333 recommends that it be set to I<1>, and that you perform transactions
334 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
335 to I<1> if you do not do explicitly set it to zero.  This is the default
336 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
337
338 =head3 DBIx::Class specific connection attributes
339
340 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
341 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
342 the following connection options. These options can be mixed in with your other
343 L<DBI> connection attributes, or placed in a separate hashref
344 (C<\%extra_attributes>) as shown above.
345
346 Every time C<connect_info> is invoked, any previous settings for
347 these options will be cleared before setting the new ones, regardless of
348 whether any options are specified in the new C<connect_info>.
349
350
351 =over
352
353 =item on_connect_do
354
355 Specifies things to do immediately after connecting or re-connecting to
356 the database.  Its value may contain:
357
358 =over
359
360 =item a scalar
361
362 This contains one SQL statement to execute.
363
364 =item an array reference
365
366 This contains SQL statements to execute in order.  Each element contains
367 a string or a code reference that returns a string.
368
369 =item a code reference
370
371 This contains some code to execute.  Unlike code references within an
372 array reference, its return value is ignored.
373
374 =back
375
376 =item on_disconnect_do
377
378 Takes arguments in the same form as L</on_connect_do> and executes them
379 immediately before disconnecting from the database.
380
381 Note, this only runs if you explicitly call L</disconnect> on the
382 storage object.
383
384 =item on_connect_call
385
386 A more generalized form of L</on_connect_do> that calls the specified
387 C<connect_call_METHOD> methods in your storage driver.
388
389   on_connect_do => 'select 1'
390
391 is equivalent to:
392
393   on_connect_call => [ [ do_sql => 'select 1' ] ]
394
395 Its values may contain:
396
397 =over
398
399 =item a scalar
400
401 Will call the C<connect_call_METHOD> method.
402
403 =item a code reference
404
405 Will execute C<< $code->($storage) >>
406
407 =item an array reference
408
409 Each value can be a method name or code reference.
410
411 =item an array of arrays
412
413 For each array, the first item is taken to be the C<connect_call_> method name
414 or code reference, and the rest are parameters to it.
415
416 =back
417
418 Some predefined storage methods you may use:
419
420 =over
421
422 =item do_sql
423
424 Executes a SQL string or a code reference that returns a SQL string. This is
425 what L</on_connect_do> and L</on_disconnect_do> use.
426
427 It can take:
428
429 =over
430
431 =item a scalar
432
433 Will execute the scalar as SQL.
434
435 =item an arrayref
436
437 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
438 attributes hashref and bind values.
439
440 =item a code reference
441
442 Will execute C<< $code->($storage) >> and execute the return array refs as
443 above.
444
445 =back
446
447 =item datetime_setup
448
449 Execute any statements necessary to initialize the database session to return
450 and accept datetime/timestamp values used with
451 L<DBIx::Class::InflateColumn::DateTime>.
452
453 Only necessary for some databases, see your specific storage driver for
454 implementation details.
455
456 =back
457
458 =item on_disconnect_call
459
460 Takes arguments in the same form as L</on_connect_call> and executes them
461 immediately before disconnecting from the database.
462
463 Calls the C<disconnect_call_METHOD> methods as opposed to the
464 C<connect_call_METHOD> methods called by L</on_connect_call>.
465
466 Note, this only runs if you explicitly call L</disconnect> on the
467 storage object.
468
469 =item disable_sth_caching
470
471 If set to a true value, this option will disable the caching of
472 statement handles via L<DBI/prepare_cached>.
473
474 =item limit_dialect
475
476 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
477 default L</sql_limit_dialect> setting of the storage (if any). For a list
478 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
479
480 =item quote_names
481
482 When true automatically sets L</quote_char> and L</name_sep> to the characters
483 appropriate for your particular RDBMS. This option is preferred over specifying
484 L</quote_char> directly.
485
486 =item quote_char
487
488 Specifies what characters to use to quote table and column names.
489
490 C<quote_char> expects either a single character, in which case is it
491 is placed on either side of the table/column name, or an arrayref of length
492 2 in which case the table/column name is placed between the elements.
493
494 For example under MySQL you should use C<< quote_char => '`' >>, and for
495 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
496
497 =item name_sep
498
499 This parameter is only useful in conjunction with C<quote_char>, and is used to
500 specify the character that separates elements (schemas, tables, columns) from
501 each other. If unspecified it defaults to the most commonly used C<.>.
502
503 =item unsafe
504
505 This Storage driver normally installs its own C<HandleError>, sets
506 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
507 all database handles, including those supplied by a coderef.  It does this
508 so that it can have consistent and useful error behavior.
509
510 If you set this option to a true value, Storage will not do its usual
511 modifications to the database handle's attributes, and instead relies on
512 the settings in your connect_info DBI options (or the values you set in
513 your connection coderef, in the case that you are connecting via coderef).
514
515 Note that your custom settings can cause Storage to malfunction,
516 especially if you set a C<HandleError> handler that suppresses exceptions
517 and/or disable C<RaiseError>.
518
519 =item auto_savepoint
520
521 If this option is true, L<DBIx::Class> will use savepoints when nesting
522 transactions, making it possible to recover from failure in the inner
523 transaction without having to abort all outer transactions.
524
525 =item cursor_class
526
527 Use this argument to supply a cursor class other than the default
528 L<DBIx::Class::Storage::DBI::Cursor>.
529
530 =back
531
532 Some real-life examples of arguments to L</connect_info> and
533 L<DBIx::Class::Schema/connect>
534
535   # Simple SQLite connection
536   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
537
538   # Connect via subref
539   ->connect_info([ sub { DBI->connect(...) } ]);
540
541   # Connect via subref in hashref
542   ->connect_info([{
543     dbh_maker => sub { DBI->connect(...) },
544     on_connect_do => 'alter session ...',
545   }]);
546
547   # A bit more complicated
548   ->connect_info(
549     [
550       'dbi:Pg:dbname=foo',
551       'postgres',
552       'my_pg_password',
553       { AutoCommit => 1 },
554       { quote_char => q{"} },
555     ]
556   );
557
558   # Equivalent to the previous example
559   ->connect_info(
560     [
561       'dbi:Pg:dbname=foo',
562       'postgres',
563       'my_pg_password',
564       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
565     ]
566   );
567
568   # Same, but with hashref as argument
569   # See parse_connect_info for explanation
570   ->connect_info(
571     [{
572       dsn         => 'dbi:Pg:dbname=foo',
573       user        => 'postgres',
574       password    => 'my_pg_password',
575       AutoCommit  => 1,
576       quote_char  => q{"},
577       name_sep    => q{.},
578     }]
579   );
580
581   # Subref + DBIx::Class-specific connection options
582   ->connect_info(
583     [
584       sub { DBI->connect(...) },
585       {
586           quote_char => q{`},
587           name_sep => q{@},
588           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
589           disable_sth_caching => 1,
590       },
591     ]
592   );
593
594
595
596 =cut
597
598 sub connect_info {
599   my ($self, $info) = @_;
600
601   return $self->_connect_info if !$info;
602
603   $self->_connect_info($info); # copy for _connect_info
604
605   $info = $self->_normalize_connect_info($info)
606     if ref $info eq 'ARRAY';
607
608   for my $storage_opt (keys %{ $info->{storage_options} }) {
609     my $value = $info->{storage_options}{$storage_opt};
610
611     $self->$storage_opt($value);
612   }
613
614   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
615   #  the new set of options
616   $self->_sql_maker(undef);
617   $self->_sql_maker_opts({});
618
619   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
620     my $value = $info->{sql_maker_options}{$sql_maker_opt};
621
622     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
623   }
624
625   my %attrs = (
626     %{ $self->_default_dbi_connect_attributes || {} },
627     %{ $info->{attributes} || {} },
628   );
629
630   my @args = @{ $info->{arguments} };
631
632   if (keys %attrs and ref $args[0] ne 'CODE') {
633     carp
634         'You provided explicit AutoCommit => 0 in your connection_info. '
635       . 'This is almost universally a bad idea (see the footnotes of '
636       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
637       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
638       . 'this warning.'
639       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
640
641     push @args, \%attrs if keys %attrs;
642   }
643   $self->_dbi_connect_info(\@args);
644
645   # FIXME - dirty:
646   # save attributes them in a separate accessor so they are always
647   # introspectable, even in case of a CODE $dbhmaker
648   $self->_dbic_connect_attributes (\%attrs);
649
650   return $self->_connect_info;
651 }
652
653 sub _normalize_connect_info {
654   my ($self, $info_arg) = @_;
655   my %info;
656
657   my @args = @$info_arg;  # take a shallow copy for further mutilation
658
659   # combine/pre-parse arguments depending on invocation style
660
661   my %attrs;
662   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
663     %attrs = %{ $args[1] || {} };
664     @args = $args[0];
665   }
666   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
667     %attrs = %{$args[0]};
668     @args = ();
669     if (my $code = delete $attrs{dbh_maker}) {
670       @args = $code;
671
672       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
673       if (@ignored) {
674         carp sprintf (
675             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
676           . "to the result of 'dbh_maker'",
677
678           join (', ', map { "'$_'" } (@ignored) ),
679         );
680       }
681     }
682     else {
683       @args = delete @attrs{qw/dsn user password/};
684     }
685   }
686   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
687     %attrs = (
688       % { $args[3] || {} },
689       % { $args[4] || {} },
690     );
691     @args = @args[0,1,2];
692   }
693
694   $info{arguments} = \@args;
695
696   my @storage_opts = grep exists $attrs{$_},
697     @storage_options, 'cursor_class';
698
699   @{ $info{storage_options} }{@storage_opts} =
700     delete @attrs{@storage_opts} if @storage_opts;
701
702   my @sql_maker_opts = grep exists $attrs{$_},
703     qw/limit_dialect quote_char name_sep quote_names/;
704
705   @{ $info{sql_maker_options} }{@sql_maker_opts} =
706     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
707
708   $info{attributes} = \%attrs if %attrs;
709
710   return \%info;
711 }
712
713 sub _default_dbi_connect_attributes () {
714   +{
715     AutoCommit => 1,
716     PrintError => 0,
717     RaiseError => 1,
718     ShowErrorStatement => 1,
719   };
720 }
721
722 =head2 on_connect_do
723
724 This method is deprecated in favour of setting via L</connect_info>.
725
726 =cut
727
728 =head2 on_disconnect_do
729
730 This method is deprecated in favour of setting via L</connect_info>.
731
732 =cut
733
734 sub _parse_connect_do {
735   my ($self, $type) = @_;
736
737   my $val = $self->$type;
738   return () if not defined $val;
739
740   my @res;
741
742   if (not ref($val)) {
743     push @res, [ 'do_sql', $val ];
744   } elsif (ref($val) eq 'CODE') {
745     push @res, $val;
746   } elsif (ref($val) eq 'ARRAY') {
747     push @res, map { [ 'do_sql', $_ ] } @$val;
748   } else {
749     $self->throw_exception("Invalid type for $type: ".ref($val));
750   }
751
752   return \@res;
753 }
754
755 =head2 dbh_do
756
757 Arguments: ($subref | $method_name), @extra_coderef_args?
758
759 Execute the given $subref or $method_name using the new exception-based
760 connection management.
761
762 The first two arguments will be the storage object that C<dbh_do> was called
763 on and a database handle to use.  Any additional arguments will be passed
764 verbatim to the called subref as arguments 2 and onwards.
765
766 Using this (instead of $self->_dbh or $self->dbh) ensures correct
767 exception handling and reconnection (or failover in future subclasses).
768
769 Your subref should have no side-effects outside of the database, as
770 there is the potential for your subref to be partially double-executed
771 if the database connection was stale/dysfunctional.
772
773 Example:
774
775   my @stuff = $schema->storage->dbh_do(
776     sub {
777       my ($storage, $dbh, @cols) = @_;
778       my $cols = join(q{, }, @cols);
779       $dbh->selectrow_array("SELECT $cols FROM foo");
780     },
781     @column_list
782   );
783
784 =cut
785
786 sub dbh_do {
787   my $self = shift;
788   my $run_target = shift;
789
790   # short circuit when we know there is no need for a runner
791   #
792   # FIXME - asumption may be wrong
793   # the rationale for the txn_depth check is that if this block is a part
794   # of a larger transaction, everything up to that point is screwed anyway
795   return $self->$run_target($self->_get_dbh, @_)
796     if $self->{_in_do_block} or $self->transaction_depth;
797
798   my $args = \@_;
799
800   DBIx::Class::Storage::BlockRunner->new(
801     storage => $self,
802     run_code => sub { $self->$run_target ($self->_get_dbh, @$args ) },
803     wrap_txn => 0,
804     retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) },
805   )->run;
806 }
807
808 sub txn_do {
809   $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
810   shift->next::method(@_);
811 }
812
813 =head2 disconnect
814
815 Our C<disconnect> method also performs a rollback first if the
816 database is not in C<AutoCommit> mode.
817
818 =cut
819
820 sub disconnect {
821   my ($self) = @_;
822
823   if( $self->_dbh ) {
824     my @actions;
825
826     push @actions, ( $self->on_disconnect_call || () );
827     push @actions, $self->_parse_connect_do ('on_disconnect_do');
828
829     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
830
831     # stops the "implicit rollback on disconnect" warning
832     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
833
834     %{ $self->_dbh->{CachedKids} } = ();
835     $self->_dbh->disconnect;
836     $self->_dbh(undef);
837     $self->{_dbh_gen}++;
838   }
839 }
840
841 =head2 with_deferred_fk_checks
842
843 =over 4
844
845 =item Arguments: C<$coderef>
846
847 =item Return Value: The return value of $coderef
848
849 =back
850
851 Storage specific method to run the code ref with FK checks deferred or
852 in MySQL's case disabled entirely.
853
854 =cut
855
856 # Storage subclasses should override this
857 sub with_deferred_fk_checks {
858   my ($self, $sub) = @_;
859   $sub->();
860 }
861
862 =head2 connected
863
864 =over
865
866 =item Arguments: none
867
868 =item Return Value: 1|0
869
870 =back
871
872 Verifies that the current database handle is active and ready to execute
873 an SQL statement (e.g. the connection did not get stale, server is still
874 answering, etc.) This method is used internally by L</dbh>.
875
876 =cut
877
878 sub connected {
879   my $self = shift;
880   return 0 unless $self->_seems_connected;
881
882   #be on the safe side
883   local $self->_dbh->{RaiseError} = 1;
884
885   return $self->_ping;
886 }
887
888 sub _seems_connected {
889   my $self = shift;
890
891   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
892
893   my $dbh = $self->_dbh
894     or return 0;
895
896   return $dbh->FETCH('Active');
897 }
898
899 sub _ping {
900   my $self = shift;
901
902   my $dbh = $self->_dbh or return 0;
903
904   return $dbh->ping;
905 }
906
907 sub ensure_connected {
908   my ($self) = @_;
909
910   unless ($self->connected) {
911     $self->_populate_dbh;
912   }
913 }
914
915 =head2 dbh
916
917 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
918 is guaranteed to be healthy by implicitly calling L</connected>, and if
919 necessary performing a reconnection before returning. Keep in mind that this
920 is very B<expensive> on some database engines. Consider using L</dbh_do>
921 instead.
922
923 =cut
924
925 sub dbh {
926   my ($self) = @_;
927
928   if (not $self->_dbh) {
929     $self->_populate_dbh;
930   } else {
931     $self->ensure_connected;
932   }
933   return $self->_dbh;
934 }
935
936 # this is the internal "get dbh or connect (don't check)" method
937 sub _get_dbh {
938   my $self = shift;
939   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
940   $self->_populate_dbh unless $self->_dbh;
941   return $self->_dbh;
942 }
943
944 sub sql_maker {
945   my ($self) = @_;
946   unless ($self->_sql_maker) {
947     my $sql_maker_class = $self->sql_maker_class;
948
949     my %opts = %{$self->_sql_maker_opts||{}};
950     my $dialect =
951       $opts{limit_dialect}
952         ||
953       $self->sql_limit_dialect
954         ||
955       do {
956         my $s_class = (ref $self) || $self;
957         carp (
958           "Your storage class ($s_class) does not set sql_limit_dialect and you "
959         . 'have not supplied an explicit limit_dialect in your connection_info. '
960         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
961         . 'databases but can be (and often is) painfully slow. '
962         . "Please file an RT ticket against '$s_class' ."
963         );
964
965         'GenericSubQ';
966       }
967     ;
968
969     my ($quote_char, $name_sep);
970
971     if ($opts{quote_names}) {
972       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
973         my $s_class = (ref $self) || $self;
974         carp (
975           "You requested 'quote_names' but your storage class ($s_class) does "
976         . 'not explicitly define a default sql_quote_char and you have not '
977         . 'supplied a quote_char as part of your connection_info. DBIC will '
978         .q{default to the ANSI SQL standard quote '"', which works most of }
979         . "the time. Please file an RT ticket against '$s_class'."
980         );
981
982         '"'; # RV
983       };
984
985       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
986     }
987
988     $self->_sql_maker($sql_maker_class->new(
989       bindtype=>'columns',
990       array_datatypes => 1,
991       limit_dialect => $dialect,
992       ($quote_char ? (quote_char => $quote_char) : ()),
993       name_sep => ($name_sep || '.'),
994       %opts,
995     ));
996   }
997   return $self->_sql_maker;
998 }
999
1000 # nothing to do by default
1001 sub _rebless {}
1002 sub _init {}
1003
1004 sub _populate_dbh {
1005   my ($self) = @_;
1006
1007   my @info = @{$self->_dbi_connect_info || []};
1008   $self->_dbh(undef); # in case ->connected failed we might get sent here
1009   $self->_dbh_details({}); # reset everything we know
1010
1011   $self->_dbh($self->_connect(@info));
1012
1013   $self->_conn_pid($$) unless DBIx::Class::_ENV_::BROKEN_FORK; # on win32 these are in fact threads
1014
1015   $self->_determine_driver;
1016
1017   # Always set the transaction depth on connect, since
1018   #  there is no transaction in progress by definition
1019   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1020
1021   $self->_run_connection_actions unless $self->{_in_determine_driver};
1022 }
1023
1024 sub _run_connection_actions {
1025   my $self = shift;
1026   my @actions;
1027
1028   push @actions, ( $self->on_connect_call || () );
1029   push @actions, $self->_parse_connect_do ('on_connect_do');
1030
1031   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1032 }
1033
1034
1035
1036 sub set_use_dbms_capability {
1037   $_[0]->set_inherited ($_[1], $_[2]);
1038 }
1039
1040 sub get_use_dbms_capability {
1041   my ($self, $capname) = @_;
1042
1043   my $use = $self->get_inherited ($capname);
1044   return defined $use
1045     ? $use
1046     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1047   ;
1048 }
1049
1050 sub set_dbms_capability {
1051   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1052 }
1053
1054 sub get_dbms_capability {
1055   my ($self, $capname) = @_;
1056
1057   my $cap = $self->_dbh_details->{capability}{$capname};
1058
1059   unless (defined $cap) {
1060     if (my $meth = $self->can ("_determine$capname")) {
1061       $cap = $self->$meth ? 1 : 0;
1062     }
1063     else {
1064       $cap = 0;
1065     }
1066
1067     $self->set_dbms_capability ($capname, $cap);
1068   }
1069
1070   return $cap;
1071 }
1072
1073 sub _server_info {
1074   my $self = shift;
1075
1076   my $info;
1077   unless ($info = $self->_dbh_details->{info}) {
1078
1079     $info = {};
1080
1081     my $server_version;
1082     try {
1083       $server_version = $self->_get_server_version;
1084     }
1085     catch {
1086       if ($self->{_in_determine_driver}) {
1087         $self->throw_exception($_);
1088       }
1089       $server_version = undef;
1090     };
1091
1092     if (defined $server_version) {
1093       $info->{dbms_version} = $server_version;
1094
1095       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1096       my @verparts = split (/\./, $numeric_version);
1097       if (
1098         @verparts
1099           &&
1100         $verparts[0] <= 999
1101       ) {
1102         # consider only up to 3 version parts, iff not more than 3 digits
1103         my @use_parts;
1104         while (@verparts && @use_parts < 3) {
1105           my $p = shift @verparts;
1106           last if $p > 999;
1107           push @use_parts, $p;
1108         }
1109         push @use_parts, 0 while @use_parts < 3;
1110
1111         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1112       }
1113     }
1114
1115     $self->_dbh_details->{info} = $info;
1116   }
1117
1118   return $info;
1119 }
1120
1121 sub _get_server_version {
1122   shift->_dbh_get_info('SQL_DBMS_VER');
1123 }
1124
1125 sub _dbh_get_info {
1126   my ($self, $info) = @_;
1127
1128   if ($info =~ /[^0-9]/) {
1129     $info = $DBI::Const::GetInfoType::GetInfoType{$info};
1130     $self->throw_exception("Info type '$_[1]' not provided by DBI::Const::GetInfoType")
1131       unless defined $info;
1132   }
1133
1134   my $res;
1135
1136   try {
1137     $res = $self->_get_dbh->get_info($info);
1138   }
1139   catch {
1140     if ($self->{_in_determine_driver}) {
1141       $self->throw_exception($_);
1142     }
1143     $res = undef;
1144   };
1145
1146   return $res;
1147 }
1148
1149 sub _describe_connection {
1150   require DBI::Const::GetInfoReturn;
1151
1152   my $self = shift;
1153   $self->ensure_connected;
1154
1155   my $res = {
1156     DBIC_DSN => $self->_dbi_connect_info->[0],
1157     DBI_VER => DBI->VERSION,
1158     DBIC_VER => DBIx::Class->VERSION,
1159     DBIC_DRIVER => ref $self,
1160   };
1161
1162   for my $inf (
1163     #keys %DBI::Const::GetInfoType::GetInfoType,
1164     qw/
1165       SQL_CURSOR_COMMIT_BEHAVIOR
1166       SQL_CURSOR_ROLLBACK_BEHAVIOR
1167       SQL_CURSOR_SENSITIVITY
1168       SQL_DATA_SOURCE_NAME
1169       SQL_DBMS_NAME
1170       SQL_DBMS_VER
1171       SQL_DEFAULT_TXN_ISOLATION
1172       SQL_DM_VER
1173       SQL_DRIVER_NAME
1174       SQL_DRIVER_ODBC_VER
1175       SQL_DRIVER_VER
1176       SQL_EXPRESSIONS_IN_ORDERBY
1177       SQL_GROUP_BY
1178       SQL_IDENTIFIER_CASE
1179       SQL_IDENTIFIER_QUOTE_CHAR
1180       SQL_MAX_CATALOG_NAME_LEN
1181       SQL_MAX_COLUMN_NAME_LEN
1182       SQL_MAX_IDENTIFIER_LEN
1183       SQL_MAX_TABLE_NAME_LEN
1184       SQL_MULTIPLE_ACTIVE_TXN
1185       SQL_MULT_RESULT_SETS
1186       SQL_NEED_LONG_DATA_LEN
1187       SQL_NON_NULLABLE_COLUMNS
1188       SQL_ODBC_VER
1189       SQL_QUALIFIER_NAME_SEPARATOR
1190       SQL_QUOTED_IDENTIFIER_CASE
1191       SQL_TXN_CAPABLE
1192       SQL_TXN_ISOLATION_OPTION
1193     /
1194   ) {
1195     my $v = $self->_dbh_get_info($inf);
1196     next unless defined $v;
1197
1198     #my $key = sprintf( '%s(%s)', $inf, $DBI::Const::GetInfoType::GetInfoType{$inf} );
1199     my $expl = DBI::Const::GetInfoReturn::Explain($inf, $v);
1200     $res->{$inf} = DBI::Const::GetInfoReturn::Format($inf, $v) . ( $expl ? " ($expl)" : '' );
1201   }
1202
1203   $res;
1204 }
1205
1206 sub _determine_driver {
1207   my ($self) = @_;
1208
1209   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1210     my $started_connected = 0;
1211     local $self->{_in_determine_driver} = 1;
1212
1213     if (ref($self) eq __PACKAGE__) {
1214       my $driver;
1215       if ($self->_dbh) { # we are connected
1216         $driver = $self->_dbh->{Driver}{Name};
1217         $started_connected = 1;
1218       }
1219       else {
1220         # if connect_info is a CODEREF, we have no choice but to connect
1221         if (ref $self->_dbi_connect_info->[0] &&
1222             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1223           $self->_populate_dbh;
1224           $driver = $self->_dbh->{Driver}{Name};
1225         }
1226         else {
1227           # try to use dsn to not require being connected, the driver may still
1228           # force a connection in _rebless to determine version
1229           # (dsn may not be supplied at all if all we do is make a mock-schema)
1230           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1231           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1232           $driver ||= $ENV{DBI_DRIVER};
1233         }
1234       }
1235
1236       if ($driver) {
1237         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1238         if ($self->load_optional_class($storage_class)) {
1239           mro::set_mro($storage_class, 'c3');
1240           bless $self, $storage_class;
1241           $self->_rebless();
1242         }
1243         else {
1244           $self->_warn_undetermined_driver(
1245             'This version of DBIC does not yet seem to supply a driver for '
1246           . "your particular RDBMS and/or connection method ('$driver')."
1247           );
1248         }
1249       }
1250       else {
1251         $self->_warn_undetermined_driver(
1252           'Unable to extract a driver name from connect info - this '
1253         . 'should not have happened.'
1254         );
1255       }
1256     }
1257
1258     $self->_driver_determined(1);
1259
1260     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1261
1262     $self->_init; # run driver-specific initializations
1263
1264     $self->_run_connection_actions
1265         if !$started_connected && defined $self->_dbh;
1266   }
1267 }
1268
1269 sub _determine_connector_driver {
1270   my ($self, $conn) = @_;
1271
1272   my $dbtype = $self->_dbh_get_info('SQL_DBMS_NAME');
1273
1274   if (not $dbtype) {
1275     $self->_warn_undetermined_driver(
1276       'Unable to retrieve RDBMS type (SQL_DBMS_NAME) of the engine behind your '
1277     . "$conn connector - this should not have happened."
1278     );
1279     return;
1280   }
1281
1282   $dbtype =~ s/\W/_/gi;
1283
1284   my $subclass = "DBIx::Class::Storage::DBI::${conn}::${dbtype}";
1285   return if $self->isa($subclass);
1286
1287   if ($self->load_optional_class($subclass)) {
1288     bless $self, $subclass;
1289     $self->_rebless;
1290   }
1291   else {
1292     $self->_warn_undetermined_driver(
1293       'This version of DBIC does not yet seem to supply a driver for '
1294     . "your particular RDBMS and/or connection method ('$conn/$dbtype')."
1295     );
1296   }
1297 }
1298
1299 sub _warn_undetermined_driver {
1300   my ($self, $msg) = @_;
1301
1302   require Data::Dumper::Concise;
1303
1304   carp_once ($msg . ' While we will attempt to continue anyway, the results '
1305   . 'are likely to be underwhelming. Please upgrade DBIC, and if this message '
1306   . "does not go away, file a bugreport including the following info:\n"
1307   . Data::Dumper::Concise::Dumper($self->_describe_connection)
1308   );
1309 }
1310
1311 sub _do_connection_actions {
1312   my $self          = shift;
1313   my $method_prefix = shift;
1314   my $call          = shift;
1315
1316   if (not ref($call)) {
1317     my $method = $method_prefix . $call;
1318     $self->$method(@_);
1319   } elsif (ref($call) eq 'CODE') {
1320     $self->$call(@_);
1321   } elsif (ref($call) eq 'ARRAY') {
1322     if (ref($call->[0]) ne 'ARRAY') {
1323       $self->_do_connection_actions($method_prefix, $_) for @$call;
1324     } else {
1325       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1326     }
1327   } else {
1328     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1329   }
1330
1331   return $self;
1332 }
1333
1334 sub connect_call_do_sql {
1335   my $self = shift;
1336   $self->_do_query(@_);
1337 }
1338
1339 sub disconnect_call_do_sql {
1340   my $self = shift;
1341   $self->_do_query(@_);
1342 }
1343
1344 # override in db-specific backend when necessary
1345 sub connect_call_datetime_setup { 1 }
1346
1347 sub _do_query {
1348   my ($self, $action) = @_;
1349
1350   if (ref $action eq 'CODE') {
1351     $action = $action->($self);
1352     $self->_do_query($_) foreach @$action;
1353   }
1354   else {
1355     # Most debuggers expect ($sql, @bind), so we need to exclude
1356     # the attribute hash which is the second argument to $dbh->do
1357     # furthermore the bind values are usually to be presented
1358     # as named arrayref pairs, so wrap those here too
1359     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1360     my $sql = shift @do_args;
1361     my $attrs = shift @do_args;
1362     my @bind = map { [ undef, $_ ] } @do_args;
1363
1364     $self->dbh_do(sub {
1365       $_[0]->_query_start($sql, \@bind);
1366       $_[1]->do($sql, $attrs, @do_args);
1367       $_[0]->_query_end($sql, \@bind);
1368     });
1369   }
1370
1371   return $self;
1372 }
1373
1374 sub _connect {
1375   my ($self, @info) = @_;
1376
1377   $self->throw_exception("You failed to provide any connection info")
1378     if !@info;
1379
1380   my ($old_connect_via, $dbh);
1381
1382   local $DBI::connect_via = 'connect' if $INC{'Apache/DBI.pm'} && $ENV{MOD_PERL};
1383
1384   try {
1385     if(ref $info[0] eq 'CODE') {
1386       $dbh = $info[0]->();
1387     }
1388     else {
1389       require DBI;
1390       $dbh = DBI->connect(@info);
1391     }
1392
1393     if (!$dbh) {
1394       die $DBI::errstr;
1395     }
1396
1397     unless ($self->unsafe) {
1398
1399       $self->throw_exception(
1400         'Refusing clobbering of {HandleError} installed on externally supplied '
1401        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1402       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1403
1404       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1405       # request, or an external handle. Complain and set anyway
1406       unless ($dbh->{RaiseError}) {
1407         carp( ref $info[0] eq 'CODE'
1408
1409           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1410            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1411            .'attribute has been supplied'
1412
1413           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1414            .'unsafe => 1. Toggling RaiseError back to true'
1415         );
1416
1417         $dbh->{RaiseError} = 1;
1418       }
1419
1420       # this odd anonymous coderef dereference is in fact really
1421       # necessary to avoid the unwanted effect described in perl5
1422       # RT#75792
1423       sub {
1424         my $weak_self = $_[0];
1425         weaken $weak_self;
1426
1427         # the coderef is blessed so we can distinguish it from externally
1428         # supplied handles (which must be preserved)
1429         $_[1]->{HandleError} = bless sub {
1430           if ($weak_self) {
1431             $weak_self->throw_exception("DBI Exception: $_[0]");
1432           }
1433           else {
1434             # the handler may be invoked by something totally out of
1435             # the scope of DBIC
1436             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1437           }
1438         }, '__DBIC__DBH__ERROR__HANDLER__';
1439       }->($self, $dbh);
1440     }
1441   }
1442   catch {
1443     $self->throw_exception("DBI Connection failed: $_")
1444   };
1445
1446   $self->_dbh_autocommit($dbh->{AutoCommit});
1447   $dbh;
1448 }
1449
1450 sub txn_begin {
1451   my $self = shift;
1452
1453   # this means we have not yet connected and do not know the AC status
1454   # (e.g. coderef $dbh), need a full-fledged connection check
1455   if (! defined $self->_dbh_autocommit) {
1456     $self->ensure_connected;
1457   }
1458   # Otherwise simply connect or re-connect on pid changes
1459   else {
1460     $self->_get_dbh;
1461   }
1462
1463   $self->next::method(@_);
1464 }
1465
1466 sub _exec_txn_begin {
1467   my $self = shift;
1468
1469   # if the user is utilizing txn_do - good for him, otherwise we need to
1470   # ensure that the $dbh is healthy on BEGIN.
1471   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1472   # will be replaced by a failure of begin_work itself (which will be
1473   # then retried on reconnect)
1474   if ($self->{_in_do_block}) {
1475     $self->_dbh->begin_work;
1476   } else {
1477     $self->dbh_do(sub { $_[1]->begin_work });
1478   }
1479 }
1480
1481 sub txn_commit {
1482   my $self = shift;
1483
1484   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1485   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1486     unless $self->_dbh;
1487
1488   # esoteric case for folks using external $dbh handles
1489   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1490     carp "Storage transaction_depth 0 does not match "
1491         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1492     $self->transaction_depth(1);
1493   }
1494
1495   $self->next::method(@_);
1496
1497   # if AutoCommit is disabled txn_depth never goes to 0
1498   # as a new txn is started immediately on commit
1499   $self->transaction_depth(1) if (
1500     !$self->transaction_depth
1501       and
1502     defined $self->_dbh_autocommit
1503       and
1504     ! $self->_dbh_autocommit
1505   );
1506 }
1507
1508 sub _exec_txn_commit {
1509   shift->_dbh->commit;
1510 }
1511
1512 sub txn_rollback {
1513   my $self = shift;
1514
1515   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1516   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1517     unless $self->_dbh;
1518
1519   # esoteric case for folks using external $dbh handles
1520   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1521     carp "Storage transaction_depth 0 does not match "
1522         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1523     $self->transaction_depth(1);
1524   }
1525
1526   $self->next::method(@_);
1527
1528   # if AutoCommit is disabled txn_depth never goes to 0
1529   # as a new txn is started immediately on commit
1530   $self->transaction_depth(1) if (
1531     !$self->transaction_depth
1532       and
1533     defined $self->_dbh_autocommit
1534       and
1535     ! $self->_dbh_autocommit
1536   );
1537 }
1538
1539 sub _exec_txn_rollback {
1540   shift->_dbh->rollback;
1541 }
1542
1543 # generate some identical methods
1544 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1545   no strict qw/refs/;
1546   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1547     my $self = shift;
1548     $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1549     $self->throw_exception("Unable to $meth() on a disconnected storage")
1550       unless $self->_dbh;
1551     $self->next::method(@_);
1552   };
1553 }
1554
1555 # This used to be the top-half of _execute.  It was split out to make it
1556 #  easier to override in NoBindVars without duping the rest.  It takes up
1557 #  all of _execute's args, and emits $sql, @bind.
1558 sub _prep_for_execute {
1559   #my ($self, $op, $ident, $args) = @_;
1560   return shift->_gen_sql_bind(@_)
1561 }
1562
1563 sub _gen_sql_bind {
1564   my ($self, $op, $ident, $args) = @_;
1565
1566   my ($sql, @bind) = $self->sql_maker->$op(
1567     blessed($ident) ? $ident->from : $ident,
1568     @$args,
1569   );
1570
1571   if (
1572     ! $ENV{DBIC_DT_SEARCH_OK}
1573       and
1574     $op eq 'select'
1575       and
1576     first { blessed($_->[1]) && $_->[1]->isa('DateTime') } @bind
1577   ) {
1578     carp_unique 'DateTime objects passed to search() are not supported '
1579       . 'properly (InflateColumn::DateTime formats and settings are not '
1580       . 'respected.) See "Formatting DateTime objects in queries" in '
1581       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1582       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1583   }
1584
1585   return( $sql, $self->_resolve_bindattrs(
1586     $ident, [ @{$args->[2]{bind}||[]}, @bind ]
1587   ));
1588 }
1589
1590 sub _resolve_bindattrs {
1591   my ($self, $ident, $bind, $colinfos) = @_;
1592
1593   $colinfos ||= {};
1594
1595   my $resolve_bindinfo = sub {
1596     #my $infohash = shift;
1597
1598     %$colinfos = %{ $self->_resolve_column_info($ident) }
1599       unless keys %$colinfos;
1600
1601     my $ret;
1602     if (my $col = $_[0]->{dbic_colname}) {
1603       $ret = { %{$_[0]} };
1604
1605       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1606         if $colinfos->{$col}{data_type};
1607
1608       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1609         if $colinfos->{$col}{size};
1610     }
1611
1612     $ret || $_[0];
1613   };
1614
1615   return [ map {
1616     if (ref $_ ne 'ARRAY') {
1617       [{}, $_]
1618     }
1619     elsif (! defined $_->[0]) {
1620       [{}, $_->[1]]
1621     }
1622     elsif (ref $_->[0] eq 'HASH') {
1623       [
1624         ($_->[0]{dbd_attrs} or $_->[0]{sqlt_datatype}) ? $_->[0] : $resolve_bindinfo->($_->[0]),
1625         $_->[1]
1626       ]
1627     }
1628     elsif (ref $_->[0] eq 'SCALAR') {
1629       [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1630     }
1631     else {
1632       [ $resolve_bindinfo->({ dbic_colname => $_->[0] }), $_->[1] ]
1633     }
1634   } @$bind ];
1635 }
1636
1637 sub _format_for_trace {
1638   #my ($self, $bind) = @_;
1639
1640   ### Turn @bind from something like this:
1641   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1642   ### to this:
1643   ###   ( "'1'", "'3'" )
1644
1645   map {
1646     defined( $_ && $_->[1] )
1647       ? qq{'$_->[1]'}
1648       : q{NULL}
1649   } @{$_[1] || []};
1650 }
1651
1652 sub _query_start {
1653   my ( $self, $sql, $bind ) = @_;
1654
1655   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1656     if $self->debug;
1657 }
1658
1659 sub _query_end {
1660   my ( $self, $sql, $bind ) = @_;
1661
1662   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1663     if $self->debug;
1664 }
1665
1666 my $sba_compat;
1667 sub _dbi_attrs_for_bind {
1668   my ($self, $ident, $bind) = @_;
1669
1670   if (! defined $sba_compat) {
1671     $self->_determine_driver;
1672     $sba_compat = $self->can('source_bind_attributes') == \&source_bind_attributes
1673       ? 0
1674       : 1
1675     ;
1676   }
1677
1678   my $sba_attrs;
1679   if ($sba_compat) {
1680     my $class = ref $self;
1681     carp_unique (
1682       "The source_bind_attributes() override in $class relies on a deprecated codepath. "
1683      .'You are strongly advised to switch your code to override bind_attribute_by_datatype() '
1684      .'instead. This legacy compat shim will also disappear some time before DBIC 0.09'
1685     );
1686
1687     my $sba_attrs = $self->source_bind_attributes
1688   }
1689
1690   my @attrs;
1691
1692   for (map { $_->[0] } @$bind) {
1693     push @attrs, do {
1694       if (exists $_->{dbd_attrs}) {
1695         $_->{dbd_attrs}
1696       }
1697       elsif($_->{sqlt_datatype}) {
1698         # cache the result in the dbh_details hash, as it can not change unless
1699         # we connect to something else
1700         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1701         if (not exists $cache->{$_->{sqlt_datatype}}) {
1702           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1703         }
1704         $cache->{$_->{sqlt_datatype}};
1705       }
1706       elsif ($sba_attrs and $_->{dbic_colname}) {
1707         $sba_attrs->{$_->{dbic_colname}} || undef;
1708       }
1709       else {
1710         undef;  # always push something at this position
1711       }
1712     }
1713   }
1714
1715   return \@attrs;
1716 }
1717
1718 sub _execute {
1719   my ($self, $op, $ident, @args) = @_;
1720
1721   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1722
1723   shift->dbh_do(    # retry over disconnects
1724     '_dbh_execute',
1725     $sql,
1726     $bind,
1727     $self->_dbi_attrs_for_bind($ident, $bind)
1728   );
1729 }
1730
1731 sub _dbh_execute {
1732   my ($self, undef, $sql, $bind, $bind_attrs) = @_;
1733
1734   $self->_query_start( $sql, $bind );
1735   my $sth = $self->_sth($sql);
1736
1737   for my $i (0 .. $#$bind) {
1738     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1739       $sth->bind_param_inout(
1740         $i + 1, # bind params counts are 1-based
1741         $bind->[$i][1],
1742         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1743         $bind_attrs->[$i],
1744       );
1745     }
1746     else {
1747       $sth->bind_param(
1748         $i + 1,
1749         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1750           ? "$bind->[$i][1]"
1751           : $bind->[$i][1]
1752         ,
1753         $bind_attrs->[$i],
1754       );
1755     }
1756   }
1757
1758   # Can this fail without throwing an exception anyways???
1759   my $rv = $sth->execute();
1760   $self->throw_exception(
1761     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1762   ) if !$rv;
1763
1764   $self->_query_end( $sql, $bind );
1765
1766   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1767 }
1768
1769 sub _prefetch_autovalues {
1770   my ($self, $source, $to_insert) = @_;
1771
1772   my $colinfo = $source->columns_info;
1773
1774   my %values;
1775   for my $col (keys %$colinfo) {
1776     if (
1777       $colinfo->{$col}{auto_nextval}
1778         and
1779       (
1780         ! exists $to_insert->{$col}
1781           or
1782         ref $to_insert->{$col} eq 'SCALAR'
1783           or
1784         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1785       )
1786     ) {
1787       $values{$col} = $self->_sequence_fetch(
1788         'NEXTVAL',
1789         ( $colinfo->{$col}{sequence} ||=
1790             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1791         ),
1792       );
1793     }
1794   }
1795
1796   \%values;
1797 }
1798
1799 sub insert {
1800   my ($self, $source, $to_insert) = @_;
1801
1802   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1803
1804   # fuse the values, but keep a separate list of prefetched_values so that
1805   # they can be fused once again with the final return
1806   $to_insert = { %$to_insert, %$prefetched_values };
1807
1808   # FIXME - we seem to assume undef values as non-supplied. This is wrong.
1809   # Investigate what does it take to s/defined/exists/
1810   my $col_infos = $source->columns_info;
1811   my %pcols = map { $_ => 1 } $source->primary_columns;
1812   my (%retrieve_cols, $autoinc_supplied, $retrieve_autoinc_col);
1813   for my $col ($source->columns) {
1814     if ($col_infos->{$col}{is_auto_increment}) {
1815       $autoinc_supplied ||= 1 if defined $to_insert->{$col};
1816       $retrieve_autoinc_col ||= $col unless $autoinc_supplied;
1817     }
1818
1819     # nothing to retrieve when explicit values are supplied
1820     next if (defined $to_insert->{$col} and ! (
1821       ref $to_insert->{$col} eq 'SCALAR'
1822         or
1823       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1824     ));
1825
1826     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1827     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1828       $pcols{$col}
1829         or
1830       $col_infos->{$col}{retrieve_on_insert}
1831     );
1832   };
1833
1834   local $self->{_autoinc_supplied_for_op} = $autoinc_supplied;
1835   local $self->{_perform_autoinc_retrieval} = $retrieve_autoinc_col;
1836
1837   my ($sqla_opts, @ir_container);
1838   if (%retrieve_cols and $self->_use_insert_returning) {
1839     $sqla_opts->{returning_container} = \@ir_container
1840       if $self->_use_insert_returning_bound;
1841
1842     $sqla_opts->{returning} = [
1843       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1844     ];
1845   }
1846
1847   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1848
1849   my %returned_cols = %$to_insert;
1850   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1851     @ir_container = try {
1852       local $SIG{__WARN__} = sub {};
1853       my @r = $sth->fetchrow_array;
1854       $sth->finish;
1855       @r;
1856     } unless @ir_container;
1857
1858     @returned_cols{@$retlist} = @ir_container if @ir_container;
1859   }
1860   else {
1861     # pull in PK if needed and then everything else
1862     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1863
1864       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1865         unless $self->can('last_insert_id');
1866
1867       my @pri_values = $self->last_insert_id($source, @missing_pri);
1868
1869       $self->throw_exception( "Can't get last insert id" )
1870         unless (@pri_values == @missing_pri);
1871
1872       @returned_cols{@missing_pri} = @pri_values;
1873       delete $retrieve_cols{$_} for @missing_pri;
1874     }
1875
1876     # if there is more left to pull
1877     if (%retrieve_cols) {
1878       $self->throw_exception(
1879         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1880       ) unless %pcols;
1881
1882       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1883
1884       my $cur = DBIx::Class::ResultSet->new($source, {
1885         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1886         select => \@left_to_fetch,
1887       })->cursor;
1888
1889       @returned_cols{@left_to_fetch} = $cur->next;
1890
1891       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1892         if scalar $cur->next;
1893     }
1894   }
1895
1896   return { %$prefetched_values, %returned_cols };
1897 }
1898
1899 sub insert_bulk {
1900   my ($self, $source, $cols, $data) = @_;
1901
1902   my @col_range = (0..$#$cols);
1903
1904   # FIXME - perhaps this is not even needed? does DBI stringify?
1905   #
1906   # forcibly stringify whatever is stringifiable
1907   # ResultSet::populate() hands us a copy - safe to mangle
1908   for my $r (0 .. $#$data) {
1909     for my $c (0 .. $#{$data->[$r]}) {
1910       $data->[$r][$c] = "$data->[$r][$c]"
1911         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1912     }
1913   }
1914
1915   my $colinfos = $source->columns_info($cols);
1916
1917   local $self->{_autoinc_supplied_for_op} =
1918     (first { $_->{is_auto_increment} } values %$colinfos)
1919       ? 1
1920       : 0
1921   ;
1922
1923   # get a slice type index based on first row of data
1924   # a "column" in this context may refer to more than one bind value
1925   # e.g. \[ '?, ?', [...], [...] ]
1926   #
1927   # construct the value type index - a description of values types for every
1928   # per-column slice of $data:
1929   #
1930   # nonexistent - nonbind literal
1931   # 0 - regular value
1932   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
1933   #
1934   # also construct the column hash to pass to the SQL generator. For plain
1935   # (non literal) values - convert the members of the first row into a
1936   # literal+bind combo, with extra positional info in the bind attr hashref.
1937   # This will allow us to match the order properly, and is so contrived
1938   # because a user-supplied literal/bind (or something else specific to a
1939   # resultsource and/or storage driver) can inject extra binds along the
1940   # way, so one can't rely on "shift positions" ordering at all. Also we
1941   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
1942   # can be later matched up by address), because we want to supply a real
1943   # value on which perhaps e.g. datatype checks will be performed
1944   my ($proto_data, $value_type_idx);
1945   for my $i (@col_range) {
1946     my $colname = $cols->[$i];
1947     if (ref $data->[0][$i] eq 'SCALAR') {
1948       # no bind value at all - no type
1949
1950       $proto_data->{$colname} = $data->[0][$i];
1951     }
1952     elsif (ref $data->[0][$i] eq 'REF' and ref ${$data->[0][$i]} eq 'ARRAY' ) {
1953       # repack, so we don't end up mangling the original \[]
1954       my ($sql, @bind) = @${$data->[0][$i]};
1955
1956       # normalization of user supplied stuff
1957       my $resolved_bind = $self->_resolve_bindattrs(
1958         $source, \@bind, $colinfos,
1959       );
1960
1961       # store value-less (attrs only) bind info - we will be comparing all
1962       # supplied binds against this for sanity
1963       $value_type_idx->{$i} = [ map { $_->[0] } @$resolved_bind ];
1964
1965       $proto_data->{$colname} = \[ $sql, map { [
1966         # inject slice order to use for $proto_bind construction
1967           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $i }
1968             =>
1969           $resolved_bind->[$_][1]
1970         ] } (0 .. $#bind)
1971       ];
1972     }
1973     else {
1974       $value_type_idx->{$i} = 0;
1975
1976       $proto_data->{$colname} = \[ '?', [
1977         { dbic_colname => $colname, _bind_data_slice_idx => $i }
1978           =>
1979         $data->[0][$i]
1980       ] ];
1981     }
1982   }
1983
1984   my ($sql, $proto_bind) = $self->_prep_for_execute (
1985     'insert',
1986     $source,
1987     [ $proto_data ],
1988   );
1989
1990   if (! @$proto_bind and keys %$value_type_idx) {
1991     # if the bindlist is empty and we had some dynamic binds, this means the
1992     # storage ate them away (e.g. the NoBindVars component) and interpolated
1993     # them directly into the SQL. This obviously can't be good for multi-inserts
1994     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1995   }
1996
1997   # sanity checks
1998   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
1999   #
2000   # use an error reporting closure for convenience (less to pass)
2001   my $bad_slice_report_cref = sub {
2002     my ($msg, $r_idx, $c_idx) = @_;
2003     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
2004       $msg,
2005       $cols->[$c_idx],
2006       do {
2007         require Data::Dumper::Concise;
2008         local $Data::Dumper::Maxdepth = 5;
2009         Data::Dumper::Concise::Dumper ({
2010           map { $cols->[$_] =>
2011             $data->[$r_idx][$_]
2012           } @col_range
2013         }),
2014       }
2015     );
2016   };
2017
2018   for my $col_idx (@col_range) {
2019     my $reference_val = $data->[0][$col_idx];
2020
2021     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
2022       my $val = $data->[$row_idx][$col_idx];
2023
2024       if (! exists $value_type_idx->{$col_idx}) { # literal no binds
2025         if (ref $val ne 'SCALAR') {
2026           $bad_slice_report_cref->(
2027             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
2028             $row_idx,
2029             $col_idx,
2030           );
2031         }
2032         elsif ($$val ne $$reference_val) {
2033           $bad_slice_report_cref->(
2034             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
2035             $row_idx,
2036             $col_idx,
2037           );
2038         }
2039       }
2040       elsif (! $value_type_idx->{$col_idx} ) {  # regular non-literal value
2041         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
2042           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
2043         }
2044       }
2045       else {  # binds from a \[], compare type and attrs
2046         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
2047           $bad_slice_report_cref->(
2048             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
2049             $row_idx,
2050             $col_idx,
2051           );
2052         }
2053         # start drilling down and bail out early on identical refs
2054         elsif (
2055           $reference_val != $val
2056             or
2057           $$reference_val != $$val
2058         ) {
2059           if (${$val}->[0] ne ${$reference_val}->[0]) {
2060             $bad_slice_report_cref->(
2061               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
2062               $row_idx,
2063               $col_idx,
2064             );
2065           }
2066           # need to check the bind attrs - a bind will happen only once for
2067           # the entire dataset, so any changes further down will be ignored.
2068           elsif (! Data::Compare::Compare(
2069             $value_type_idx->{$col_idx},
2070             [
2071               map
2072               { $_->[0] }
2073               @{$self->_resolve_bindattrs(
2074                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
2075               )}
2076             ],
2077           )) {
2078             $bad_slice_report_cref->(
2079               'Differing bind attributes on literal/bind values not supported',
2080               $row_idx,
2081               $col_idx,
2082             );
2083           }
2084         }
2085       }
2086     }
2087   }
2088
2089   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
2090   # are atomic (even if execute_for_fetch is a single call). Thus a safety
2091   # scope guard
2092   my $guard = $self->txn_scope_guard;
2093
2094   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
2095   my $sth = $self->_sth($sql);
2096   my $rv = do {
2097     if (@$proto_bind) {
2098       # proto bind contains the information on which pieces of $data to pull
2099       # $cols is passed in only for prettier error-reporting
2100       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
2101     }
2102     else {
2103       # bind_param_array doesn't work if there are no binds
2104       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
2105     }
2106   };
2107
2108   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
2109
2110   $guard->commit;
2111
2112   return wantarray ? ($rv, $sth, @$proto_bind) : $rv;
2113 }
2114
2115 # execute_for_fetch is capable of returning data just fine (it means it
2116 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
2117 # is the void-populate fast-path we will just ignore this altogether
2118 # for the time being.
2119 sub _dbh_execute_for_fetch {
2120   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
2121
2122   my @idx_range = ( 0 .. $#$proto_bind );
2123
2124   # If we have any bind attributes to take care of, we will bind the
2125   # proto-bind data (which will never be used by execute_for_fetch)
2126   # However since column bindtypes are "sticky", this is sufficient
2127   # to get the DBD to apply the bindtype to all values later on
2128
2129   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
2130
2131   for my $i (@idx_range) {
2132     $sth->bind_param (
2133       $i+1, # DBI bind indexes are 1-based
2134       $proto_bind->[$i][1],
2135       $bind_attrs->[$i],
2136     ) if defined $bind_attrs->[$i];
2137   }
2138
2139   # At this point $data slots named in the _bind_data_slice_idx of
2140   # each piece of $proto_bind are either \[]s or plain values to be
2141   # passed in. Construct the dispensing coderef. *NOTE* the order
2142   # of $data will differ from this of the ?s in the SQL (due to
2143   # alphabetical ordering by colname). We actually do want to
2144   # preserve this behavior so that prepare_cached has a better
2145   # chance of matching on unrelated calls
2146   my %data_reorder = map { $proto_bind->[$_][0]{_bind_data_slice_idx} => $_ } @idx_range;
2147
2148   my $fetch_row_idx = -1; # saner loop this way
2149   my $fetch_tuple = sub {
2150     return undef if ++$fetch_row_idx > $#$data;
2151
2152     return [ map
2153       { (ref $_ eq 'REF' and ref $$_ eq 'ARRAY')
2154         ? map { $_->[-1] } @{$$_}[1 .. $#$$_]
2155         : $_
2156       }
2157       map
2158         { $data->[$fetch_row_idx][$_]}
2159         sort
2160           { $data_reorder{$a} <=> $data_reorder{$b} }
2161           keys %data_reorder
2162     ];
2163   };
2164
2165   my $tuple_status = [];
2166   my ($rv, $err);
2167   try {
2168     $rv = $sth->execute_for_fetch(
2169       $fetch_tuple,
2170       $tuple_status,
2171     );
2172   }
2173   catch {
2174     $err = shift;
2175   };
2176
2177   # Not all DBDs are create equal. Some throw on error, some return
2178   # an undef $rv, and some set $sth->err - try whatever we can
2179   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2180     ! defined $err
2181       and
2182     ( !defined $rv or $sth->err )
2183   );
2184
2185   # Statement must finish even if there was an exception.
2186   try {
2187     $sth->finish
2188   }
2189   catch {
2190     $err = shift unless defined $err
2191   };
2192
2193   if (defined $err) {
2194     my $i = 0;
2195     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2196
2197     $self->throw_exception("Unexpected populate error: $err")
2198       if ($i > $#$tuple_status);
2199
2200     require Data::Dumper::Concise;
2201     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2202       ($tuple_status->[$i][1] || $err),
2203       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2204     );
2205   }
2206
2207   return $rv;
2208 }
2209
2210 sub _dbh_execute_inserts_with_no_binds {
2211   my ($self, $sth, $count) = @_;
2212
2213   my $err;
2214   try {
2215     my $dbh = $self->_get_dbh;
2216     local $dbh->{RaiseError} = 1;
2217     local $dbh->{PrintError} = 0;
2218
2219     $sth->execute foreach 1..$count;
2220   }
2221   catch {
2222     $err = shift;
2223   };
2224
2225   # Make sure statement is finished even if there was an exception.
2226   try {
2227     $sth->finish
2228   }
2229   catch {
2230     $err = shift unless defined $err;
2231   };
2232
2233   $self->throw_exception($err) if defined $err;
2234
2235   return $count;
2236 }
2237
2238 sub update {
2239   #my ($self, $source, @args) = @_;
2240   shift->_execute('update', @_);
2241 }
2242
2243
2244 sub delete {
2245   #my ($self, $source, @args) = @_;
2246   shift->_execute('delete', @_);
2247 }
2248
2249 sub _select {
2250   my $self = shift;
2251   $self->_execute($self->_select_args(@_));
2252 }
2253
2254 sub _select_args_to_query {
2255   my $self = shift;
2256
2257   $self->throw_exception(
2258     "Unable to generate limited query representation with 'software_limit' enabled"
2259   ) if ($_[3]->{software_limit} and ($_[3]->{offset} or $_[3]->{rows}) );
2260
2261   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2262   #  = $self->_select_args($ident, $select, $cond, $attrs);
2263   my ($op, $ident, @args) =
2264     $self->_select_args(@_);
2265
2266   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2267   my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, \@args);
2268   $prepared_bind ||= [];
2269
2270   return wantarray
2271     ? ($sql, $prepared_bind)
2272     : \[ "($sql)", @$prepared_bind ]
2273   ;
2274 }
2275
2276 sub _select_args {
2277   my ($self, $ident, $select, $where, $attrs) = @_;
2278
2279   my $sql_maker = $self->sql_maker;
2280   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2281
2282   $attrs = {
2283     %$attrs,
2284     select => $select,
2285     from => $ident,
2286     where => $where,
2287     $rs_alias && $alias2source->{$rs_alias}
2288       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2289       : ()
2290     ,
2291   };
2292
2293   # Sanity check the attributes (SQLMaker does it too, but
2294   # in case of a software_limit we'll never reach there)
2295   if (defined $attrs->{offset}) {
2296     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2297       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2298   }
2299
2300   if (defined $attrs->{rows}) {
2301     $self->throw_exception("The rows attribute must be a positive integer if present")
2302       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2303   }
2304   elsif ($attrs->{offset}) {
2305     # MySQL actually recommends this approach.  I cringe.
2306     $attrs->{rows} = $sql_maker->__max_int;
2307   }
2308
2309   my @limit;
2310
2311   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2312   # storage, unless software limit was requested
2313   if (
2314     #limited has_many
2315     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2316        ||
2317     # grouped prefetch (to satisfy group_by == select)
2318     ( $attrs->{group_by}
2319         &&
2320       @{$attrs->{group_by}}
2321         &&
2322       $attrs->{_prefetch_selector_range}
2323     )
2324   ) {
2325     ($ident, $select, $where, $attrs)
2326       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2327   }
2328   elsif (! $attrs->{software_limit} ) {
2329     push @limit, (
2330       $attrs->{rows} || (),
2331       $attrs->{offset} || (),
2332     );
2333   }
2334
2335   # try to simplify the joinmap further (prune unreferenced type-single joins)
2336   if (
2337     ref $ident
2338       and
2339     reftype $ident eq 'ARRAY'
2340       and
2341     @$ident != 1
2342   ) {
2343     $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2344   }
2345
2346 ###
2347   # This would be the point to deflate anything found in $where
2348   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2349   # expect a row object. And all we have is a resultsource (it is trivial
2350   # to extract deflator coderefs via $alias2source above).
2351   #
2352   # I don't see a way forward other than changing the way deflators are
2353   # invoked, and that's just bad...
2354 ###
2355
2356   return ('select', $ident, $select, $where, $attrs, @limit);
2357 }
2358
2359 # Returns a counting SELECT for a simple count
2360 # query. Abstracted so that a storage could override
2361 # this to { count => 'firstcol' } or whatever makes
2362 # sense as a performance optimization
2363 sub _count_select {
2364   #my ($self, $source, $rs_attrs) = @_;
2365   return { count => '*' };
2366 }
2367
2368 sub source_bind_attributes {
2369   shift->throw_exception(
2370     'source_bind_attributes() was never meant to be a callable public method - '
2371    .'please contact the DBIC dev-team and describe your use case so that a reasonable '
2372    .'solution can be provided'
2373    ."\nhttp://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT"
2374   );
2375 }
2376
2377 =head2 select
2378
2379 =over 4
2380
2381 =item Arguments: $ident, $select, $condition, $attrs
2382
2383 =back
2384
2385 Handle a SQL select statement.
2386
2387 =cut
2388
2389 sub select {
2390   my $self = shift;
2391   my ($ident, $select, $condition, $attrs) = @_;
2392   return $self->cursor_class->new($self, \@_, $attrs);
2393 }
2394
2395 sub select_single {
2396   my $self = shift;
2397   my ($rv, $sth, @bind) = $self->_select(@_);
2398   my @row = $sth->fetchrow_array;
2399   my @nextrow = $sth->fetchrow_array if @row;
2400   if(@row && @nextrow) {
2401     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2402   }
2403   # Need to call finish() to work round broken DBDs
2404   $sth->finish();
2405   return @row;
2406 }
2407
2408 =head2 sql_limit_dialect
2409
2410 This is an accessor for the default SQL limit dialect used by a particular
2411 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2412 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2413 see L<DBIx::Class::SQLMaker::LimitDialects>.
2414
2415 =cut
2416
2417 sub _dbh_sth {
2418   my ($self, $dbh, $sql) = @_;
2419
2420   # 3 is the if_active parameter which avoids active sth re-use
2421   my $sth = $self->disable_sth_caching
2422     ? $dbh->prepare($sql)
2423     : $dbh->prepare_cached($sql, {}, 3);
2424
2425   # XXX You would think RaiseError would make this impossible,
2426   #  but apparently that's not true :(
2427   $self->throw_exception(
2428     $dbh->errstr
2429       ||
2430     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2431             .'an exception and/or setting $dbh->errstr',
2432       length ($sql) > 20
2433         ? substr($sql, 0, 20) . '...'
2434         : $sql
2435       ,
2436       'DBD::' . $dbh->{Driver}{Name},
2437     )
2438   ) if !$sth;
2439
2440   $sth;
2441 }
2442
2443 sub sth {
2444   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2445   shift->_sth(@_);
2446 }
2447
2448 sub _sth {
2449   my ($self, $sql) = @_;
2450   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2451 }
2452
2453 sub _dbh_columns_info_for {
2454   my ($self, $dbh, $table) = @_;
2455
2456   if ($dbh->can('column_info')) {
2457     my %result;
2458     my $caught;
2459     try {
2460       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2461       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2462       $sth->execute();
2463       while ( my $info = $sth->fetchrow_hashref() ){
2464         my %column_info;
2465         $column_info{data_type}   = $info->{TYPE_NAME};
2466         $column_info{size}      = $info->{COLUMN_SIZE};
2467         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2468         $column_info{default_value} = $info->{COLUMN_DEF};
2469         my $col_name = $info->{COLUMN_NAME};
2470         $col_name =~ s/^\"(.*)\"$/$1/;
2471
2472         $result{$col_name} = \%column_info;
2473       }
2474     } catch {
2475       $caught = 1;
2476     };
2477     return \%result if !$caught && scalar keys %result;
2478   }
2479
2480   my %result;
2481   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2482   $sth->execute;
2483   my @columns = @{$sth->{NAME_lc}};
2484   for my $i ( 0 .. $#columns ){
2485     my %column_info;
2486     $column_info{data_type} = $sth->{TYPE}->[$i];
2487     $column_info{size} = $sth->{PRECISION}->[$i];
2488     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2489
2490     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2491       $column_info{data_type} = $1;
2492       $column_info{size}    = $2;
2493     }
2494
2495     $result{$columns[$i]} = \%column_info;
2496   }
2497   $sth->finish;
2498
2499   foreach my $col (keys %result) {
2500     my $colinfo = $result{$col};
2501     my $type_num = $colinfo->{data_type};
2502     my $type_name;
2503     if(defined $type_num && $dbh->can('type_info')) {
2504       my $type_info = $dbh->type_info($type_num);
2505       $type_name = $type_info->{TYPE_NAME} if $type_info;
2506       $colinfo->{data_type} = $type_name if $type_name;
2507     }
2508   }
2509
2510   return \%result;
2511 }
2512
2513 sub columns_info_for {
2514   my ($self, $table) = @_;
2515   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2516 }
2517
2518 =head2 last_insert_id
2519
2520 Return the row id of the last insert.
2521
2522 =cut
2523
2524 sub _dbh_last_insert_id {
2525     my ($self, $dbh, $source, $col) = @_;
2526
2527     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2528
2529     return $id if defined $id;
2530
2531     my $class = ref $self;
2532     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2533 }
2534
2535 sub last_insert_id {
2536   my $self = shift;
2537   $self->_dbh_last_insert_id ($self->_dbh, @_);
2538 }
2539
2540 =head2 _native_data_type
2541
2542 =over 4
2543
2544 =item Arguments: $type_name
2545
2546 =back
2547
2548 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2549 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2550 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2551
2552 The default implementation returns C<undef>, implement in your Storage driver if
2553 you need this functionality.
2554
2555 Should map types from other databases to the native RDBMS type, for example
2556 C<VARCHAR2> to C<VARCHAR>.
2557
2558 Types with modifiers should map to the underlying data type. For example,
2559 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2560
2561 Composite types should map to the container type, for example
2562 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2563
2564 =cut
2565
2566 sub _native_data_type {
2567   #my ($self, $data_type) = @_;
2568   return undef
2569 }
2570
2571 # Check if placeholders are supported at all
2572 sub _determine_supports_placeholders {
2573   my $self = shift;
2574   my $dbh  = $self->_get_dbh;
2575
2576   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2577   # but it is inaccurate more often than not
2578   return try {
2579     local $dbh->{PrintError} = 0;
2580     local $dbh->{RaiseError} = 1;
2581     $dbh->do('select ?', {}, 1);
2582     1;
2583   }
2584   catch {
2585     0;
2586   };
2587 }
2588
2589 # Check if placeholders bound to non-string types throw exceptions
2590 #
2591 sub _determine_supports_typeless_placeholders {
2592   my $self = shift;
2593   my $dbh  = $self->_get_dbh;
2594
2595   return try {
2596     local $dbh->{PrintError} = 0;
2597     local $dbh->{RaiseError} = 1;
2598     # this specifically tests a bind that is NOT a string
2599     $dbh->do('select 1 where 1 = ?', {}, 1);
2600     1;
2601   }
2602   catch {
2603     0;
2604   };
2605 }
2606
2607 =head2 sqlt_type
2608
2609 Returns the database driver name.
2610
2611 =cut
2612
2613 sub sqlt_type {
2614   shift->_get_dbh->{Driver}->{Name};
2615 }
2616
2617 =head2 bind_attribute_by_data_type
2618
2619 Given a datatype from column info, returns a database specific bind
2620 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2621 let the database planner just handle it.
2622
2623 Generally only needed for special case column types, like bytea in postgres.
2624
2625 =cut
2626
2627 sub bind_attribute_by_data_type {
2628     return;
2629 }
2630
2631 =head2 is_datatype_numeric
2632
2633 Given a datatype from column_info, returns a boolean value indicating if
2634 the current RDBMS considers it a numeric value. This controls how
2635 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2636 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2637 be performed instead of the usual C<eq>.
2638
2639 =cut
2640
2641 sub is_datatype_numeric {
2642   #my ($self, $dt) = @_;
2643
2644   return 0 unless $_[1];
2645
2646   $_[1] =~ /^ (?:
2647     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2648   ) $/ix;
2649 }
2650
2651
2652 =head2 create_ddl_dir
2653
2654 =over 4
2655
2656 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2657
2658 =back
2659
2660 Creates a SQL file based on the Schema, for each of the specified
2661 database engines in C<\@databases> in the given directory.
2662 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2663
2664 Given a previous version number, this will also create a file containing
2665 the ALTER TABLE statements to transform the previous schema into the
2666 current one. Note that these statements may contain C<DROP TABLE> or
2667 C<DROP COLUMN> statements that can potentially destroy data.
2668
2669 The file names are created using the C<ddl_filename> method below, please
2670 override this method in your schema if you would like a different file
2671 name format. For the ALTER file, the same format is used, replacing
2672 $version in the name with "$preversion-$version".
2673
2674 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2675 The most common value for this would be C<< { add_drop_table => 1 } >>
2676 to have the SQL produced include a C<DROP TABLE> statement for each table
2677 created. For quoting purposes supply C<quote_table_names> and
2678 C<quote_field_names>.
2679
2680 If no arguments are passed, then the following default values are assumed:
2681
2682 =over 4
2683
2684 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2685
2686 =item version    - $schema->schema_version
2687
2688 =item directory  - './'
2689
2690 =item preversion - <none>
2691
2692 =back
2693
2694 By default, C<\%sqlt_args> will have
2695
2696  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2697
2698 merged with the hash passed in. To disable any of those features, pass in a
2699 hashref like the following
2700
2701  { ignore_constraint_names => 0, # ... other options }
2702
2703
2704 WARNING: You are strongly advised to check all SQL files created, before applying
2705 them.
2706
2707 =cut
2708
2709 sub create_ddl_dir {
2710   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2711
2712   unless ($dir) {
2713     carp "No directory given, using ./\n";
2714     $dir = './';
2715   } else {
2716       -d $dir
2717         or
2718       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2719         or
2720       $self->throw_exception(
2721         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2722       );
2723   }
2724
2725   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2726
2727   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2728   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2729
2730   my $schema_version = $schema->schema_version || '1.x';
2731   $version ||= $schema_version;
2732
2733   $sqltargs = {
2734     add_drop_table => 1,
2735     ignore_constraint_names => 1,
2736     ignore_index_names => 1,
2737     %{$sqltargs || {}}
2738   };
2739
2740   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2741     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2742   }
2743
2744   my $sqlt = SQL::Translator->new( $sqltargs );
2745
2746   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2747   my $sqlt_schema = $sqlt->translate({ data => $schema })
2748     or $self->throw_exception ($sqlt->error);
2749
2750   foreach my $db (@$databases) {
2751     $sqlt->reset();
2752     $sqlt->{schema} = $sqlt_schema;
2753     $sqlt->producer($db);
2754
2755     my $file;
2756     my $filename = $schema->ddl_filename($db, $version, $dir);
2757     if (-e $filename && ($version eq $schema_version )) {
2758       # if we are dumping the current version, overwrite the DDL
2759       carp "Overwriting existing DDL file - $filename";
2760       unlink($filename);
2761     }
2762
2763     my $output = $sqlt->translate;
2764     if(!$output) {
2765       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2766       next;
2767     }
2768     if(!open($file, ">$filename")) {
2769       $self->throw_exception("Can't open $filename for writing ($!)");
2770       next;
2771     }
2772     print $file $output;
2773     close($file);
2774
2775     next unless ($preversion);
2776
2777     require SQL::Translator::Diff;
2778
2779     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2780     if(!-e $prefilename) {
2781       carp("No previous schema file found ($prefilename)");
2782       next;
2783     }
2784
2785     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2786     if(-e $difffile) {
2787       carp("Overwriting existing diff file - $difffile");
2788       unlink($difffile);
2789     }
2790
2791     my $source_schema;
2792     {
2793       my $t = SQL::Translator->new($sqltargs);
2794       $t->debug( 0 );
2795       $t->trace( 0 );
2796
2797       $t->parser( $db )
2798         or $self->throw_exception ($t->error);
2799
2800       my $out = $t->translate( $prefilename )
2801         or $self->throw_exception ($t->error);
2802
2803       $source_schema = $t->schema;
2804
2805       $source_schema->name( $prefilename )
2806         unless ( $source_schema->name );
2807     }
2808
2809     # The "new" style of producers have sane normalization and can support
2810     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2811     # And we have to diff parsed SQL against parsed SQL.
2812     my $dest_schema = $sqlt_schema;
2813
2814     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2815       my $t = SQL::Translator->new($sqltargs);
2816       $t->debug( 0 );
2817       $t->trace( 0 );
2818
2819       $t->parser( $db )
2820         or $self->throw_exception ($t->error);
2821
2822       my $out = $t->translate( $filename )
2823         or $self->throw_exception ($t->error);
2824
2825       $dest_schema = $t->schema;
2826
2827       $dest_schema->name( $filename )
2828         unless $dest_schema->name;
2829     }
2830
2831     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2832                                                   $dest_schema,   $db,
2833                                                   $sqltargs
2834                                                  );
2835     if(!open $file, ">$difffile") {
2836       $self->throw_exception("Can't write to $difffile ($!)");
2837       next;
2838     }
2839     print $file $diff;
2840     close($file);
2841   }
2842 }
2843
2844 =head2 deployment_statements
2845
2846 =over 4
2847
2848 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2849
2850 =back
2851
2852 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2853
2854 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2855 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2856
2857 C<$directory> is used to return statements from files in a previously created
2858 L</create_ddl_dir> directory and is optional. The filenames are constructed
2859 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2860
2861 If no C<$directory> is specified then the statements are constructed on the
2862 fly using L<SQL::Translator> and C<$version> is ignored.
2863
2864 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2865
2866 =cut
2867
2868 sub deployment_statements {
2869   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2870   $type ||= $self->sqlt_type;
2871   $version ||= $schema->schema_version || '1.x';
2872   $dir ||= './';
2873   my $filename = $schema->ddl_filename($type, $version, $dir);
2874   if(-f $filename)
2875   {
2876       # FIXME replace this block when a proper sane sql parser is available
2877       my $file;
2878       open($file, "<$filename")
2879         or $self->throw_exception("Can't open $filename ($!)");
2880       my @rows = <$file>;
2881       close($file);
2882       return join('', @rows);
2883   }
2884
2885   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2886     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2887   }
2888
2889   # sources needs to be a parser arg, but for simplicty allow at top level
2890   # coming in
2891   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2892       if exists $sqltargs->{sources};
2893
2894   my $tr = SQL::Translator->new(
2895     producer => "SQL::Translator::Producer::${type}",
2896     %$sqltargs,
2897     parser => 'SQL::Translator::Parser::DBIx::Class',
2898     data => $schema,
2899   );
2900
2901   return preserve_context {
2902     $tr->translate
2903   } after => sub {
2904     $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2905       unless defined $_[0];
2906   };
2907 }
2908
2909 # FIXME deploy() currently does not accurately report sql errors
2910 # Will always return true while errors are warned
2911 sub deploy {
2912   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2913   my $deploy = sub {
2914     my $line = shift;
2915     return if(!$line);
2916     return if($line =~ /^--/);
2917     # next if($line =~ /^DROP/m);
2918     return if($line =~ /^BEGIN TRANSACTION/m);
2919     return if($line =~ /^COMMIT/m);
2920     return if $line =~ /^\s+$/; # skip whitespace only
2921     $self->_query_start($line);
2922     try {
2923       # do a dbh_do cycle here, as we need some error checking in
2924       # place (even though we will ignore errors)
2925       $self->dbh_do (sub { $_[1]->do($line) });
2926     } catch {
2927       carp qq{$_ (running "${line}")};
2928     };
2929     $self->_query_end($line);
2930   };
2931   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2932   if (@statements > 1) {
2933     foreach my $statement (@statements) {
2934       $deploy->( $statement );
2935     }
2936   }
2937   elsif (@statements == 1) {
2938     # split on single line comments and end of statements
2939     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2940       $deploy->( $line );
2941     }
2942   }
2943 }
2944
2945 =head2 datetime_parser
2946
2947 Returns the datetime parser class
2948
2949 =cut
2950
2951 sub datetime_parser {
2952   my $self = shift;
2953   return $self->{datetime_parser} ||= do {
2954     $self->build_datetime_parser(@_);
2955   };
2956 }
2957
2958 =head2 datetime_parser_type
2959
2960 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2961
2962 =head2 build_datetime_parser
2963
2964 See L</datetime_parser>
2965
2966 =cut
2967
2968 sub build_datetime_parser {
2969   my $self = shift;
2970   my $type = $self->datetime_parser_type(@_);
2971   return $type;
2972 }
2973
2974
2975 =head2 is_replicating
2976
2977 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2978 replicate from a master database.  Default is undef, which is the result
2979 returned by databases that don't support replication.
2980
2981 =cut
2982
2983 sub is_replicating {
2984     return;
2985
2986 }
2987
2988 =head2 lag_behind_master
2989
2990 Returns a number that represents a certain amount of lag behind a master db
2991 when a given storage is replicating.  The number is database dependent, but
2992 starts at zero and increases with the amount of lag. Default in undef
2993
2994 =cut
2995
2996 sub lag_behind_master {
2997     return;
2998 }
2999
3000 =head2 relname_to_table_alias
3001
3002 =over 4
3003
3004 =item Arguments: $relname, $join_count
3005
3006 =back
3007
3008 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
3009 queries.
3010
3011 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
3012 way these aliases are named.
3013
3014 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
3015 otherwise C<"$relname">.
3016
3017 =cut
3018
3019 sub relname_to_table_alias {
3020   my ($self, $relname, $join_count) = @_;
3021
3022   my $alias = ($join_count && $join_count > 1 ?
3023     join('_', $relname, $join_count) : $relname);
3024
3025   return $alias;
3026 }
3027
3028 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
3029 # version and it may be necessary to amend or override it for a specific storage
3030 # if such binds are necessary.
3031 sub _max_column_bytesize {
3032   my ($self, $attr) = @_;
3033
3034   my $max_size;
3035
3036   if ($attr->{sqlt_datatype}) {
3037     my $data_type = lc($attr->{sqlt_datatype});
3038
3039     if ($attr->{sqlt_size}) {
3040
3041       # String/sized-binary types
3042       if ($data_type =~ /^(?:
3043           l? (?:var)? char(?:acter)? (?:\s*varying)?
3044             |
3045           (?:var)? binary (?:\s*varying)?
3046             |
3047           raw
3048         )\b/x
3049       ) {
3050         $max_size = $attr->{sqlt_size};
3051       }
3052       # Other charset/unicode types, assume scale of 4
3053       elsif ($data_type =~ /^(?:
3054           national \s* character (?:\s*varying)?
3055             |
3056           nchar
3057             |
3058           univarchar
3059             |
3060           nvarchar
3061         )\b/x
3062       ) {
3063         $max_size = $attr->{sqlt_size} * 4;
3064       }
3065     }
3066
3067     if (!$max_size and !$self->_is_lob_type($data_type)) {
3068       $max_size = 100 # for all other (numeric?) datatypes
3069     }
3070   }
3071
3072   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
3073 }
3074
3075 # Determine if a data_type is some type of BLOB
3076 sub _is_lob_type {
3077   my ($self, $data_type) = @_;
3078   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
3079     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
3080                                   |varchar|character\s*varying|nvarchar
3081                                   |national\s*character\s*varying))?\z/xi);
3082 }
3083
3084 sub _is_binary_lob_type {
3085   my ($self, $data_type) = @_;
3086   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
3087     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
3088 }
3089
3090 sub _is_text_lob_type {
3091   my ($self, $data_type) = @_;
3092   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
3093     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
3094                         |national\s*character\s*varying))\z/xi);
3095 }
3096
3097 # Determine if a data_type is some type of a binary type
3098 sub _is_binary_type {
3099   my ($self, $data_type) = @_;
3100   $data_type && ($self->_is_binary_lob_type($data_type)
3101     || $data_type =~ /(?:var)?(?:binary|bit|graphic)(?:\s*varying)?/i);
3102 }
3103
3104 1;
3105
3106 =head1 USAGE NOTES
3107
3108 =head2 DBIx::Class and AutoCommit
3109
3110 DBIx::Class can do some wonderful magic with handling exceptions,
3111 disconnections, and transactions when you use C<< AutoCommit => 1 >>
3112 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
3113 transaction support.
3114
3115 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
3116 in an assumed transaction between commits, and you're telling us you'd
3117 like to manage that manually.  A lot of the magic protections offered by
3118 this module will go away.  We can't protect you from exceptions due to database
3119 disconnects because we don't know anything about how to restart your
3120 transactions.  You're on your own for handling all sorts of exceptional
3121 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
3122 be with raw DBI.
3123
3124
3125 =head1 AUTHORS
3126
3127 Matt S. Trout <mst@shadowcatsystems.co.uk>
3128
3129 Andy Grundman <andy@hybridized.org>
3130
3131 =head1 LICENSE
3132
3133 You may distribute this code under the same terms as Perl itself.
3134
3135 =cut