6825e159bc93aa14740c0b51ab6b8ab64f4e11a7
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Context::Preserve 'preserve_context';
16 use Try::Tiny;
17 use overload ();
18 use Data::Compare (); # no imports!!! guard against insane architecture
19 use DBI::Const::GetInfoType (); # no import of retarded global hash
20 use namespace::clean;
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/
26   sql_limit_dialect sql_quote_char sql_name_sep
27 /);
28
29 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
30
31 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
32 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
33
34 __PACKAGE__->sql_name_sep('.');
35
36 __PACKAGE__->mk_group_accessors('simple' => qw/
37   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
38   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
39   _perform_autoinc_retrieval _autoinc_supplied_for_op
40 /);
41
42 # the values for these accessors are picked out (and deleted) from
43 # the attribute hashref passed to connect_info
44 my @storage_options = qw/
45   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
46   disable_sth_caching unsafe auto_savepoint
47 /;
48 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
49
50
51 # capability definitions, using a 2-tiered accessor system
52 # The rationale is:
53 #
54 # A driver/user may define _use_X, which blindly without any checks says:
55 # "(do not) use this capability", (use_dbms_capability is an "inherited"
56 # type accessor)
57 #
58 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
59 # accessor, which in turn calls _determine_supports_X, and stores the return
60 # in a special slot on the storage object, which is wiped every time a $dbh
61 # reconnection takes place (it is not guaranteed that upon reconnection we
62 # will get the same rdbms version). _determine_supports_X does not need to
63 # exist on a driver, as we ->can for it before calling.
64
65 my @capabilities = (qw/
66   insert_returning
67   insert_returning_bound
68
69   multicolumn_in
70
71   placeholders
72   typeless_placeholders
73
74   join_optimizer
75 /);
76 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
77 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
78
79 # on by default, not strictly a capability (pending rewrite)
80 __PACKAGE__->_use_join_optimizer (1);
81 sub _determine_supports_join_optimizer { 1 };
82
83 # Each of these methods need _determine_driver called before itself
84 # in order to function reliably. This is a purely DRY optimization
85 #
86 # get_(use)_dbms_capability need to be called on the correct Storage
87 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
88 # _determine_supports_X which obv. needs a correct driver as well
89 my @rdbms_specific_methods = qw/
90   deployment_statements
91   sqlt_type
92   sql_maker
93   build_datetime_parser
94   datetime_parser_type
95
96   txn_begin
97   insert
98   insert_bulk
99   update
100   delete
101   select
102   select_single
103   with_deferred_fk_checks
104
105   get_use_dbms_capability
106   get_dbms_capability
107
108   _server_info
109   _get_server_version
110 /;
111
112 for my $meth (@rdbms_specific_methods) {
113
114   my $orig = __PACKAGE__->can ($meth)
115     or die "$meth is not a ::Storage::DBI method!";
116
117   no strict qw/refs/;
118   no warnings qw/redefine/;
119   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
120     if (
121       # only fire when invoked on an instance, a valid class-based invocation
122       # would e.g. be setting a default for an inherited accessor
123       ref $_[0]
124         and
125       ! $_[0]->_driver_determined
126         and
127       ! $_[0]->{_in_determine_driver}
128     ) {
129       $_[0]->_determine_driver;
130
131       # This for some reason crashes and burns on perl 5.8.1
132       # IFF the method ends up throwing an exception
133       #goto $_[0]->can ($meth);
134
135       my $cref = $_[0]->can ($meth);
136       goto $cref;
137     }
138
139     goto $orig;
140   };
141 }
142
143 =head1 NAME
144
145 DBIx::Class::Storage::DBI - DBI storage handler
146
147 =head1 SYNOPSIS
148
149   my $schema = MySchema->connect('dbi:SQLite:my.db');
150
151   $schema->storage->debug(1);
152
153   my @stuff = $schema->storage->dbh_do(
154     sub {
155       my ($storage, $dbh, @args) = @_;
156       $dbh->do("DROP TABLE authors");
157     },
158     @column_list
159   );
160
161   $schema->resultset('Book')->search({
162      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
163   });
164
165 =head1 DESCRIPTION
166
167 This class represents the connection to an RDBMS via L<DBI>.  See
168 L<DBIx::Class::Storage> for general information.  This pod only
169 documents DBI-specific methods and behaviors.
170
171 =head1 METHODS
172
173 =cut
174
175 sub new {
176   my $new = shift->next::method(@_);
177
178   $new->_sql_maker_opts({});
179   $new->_dbh_details({});
180   $new->{_in_do_block} = 0;
181   $new->{_dbh_gen} = 0;
182
183   # read below to see what this does
184   $new->_arm_global_destructor;
185
186   $new;
187 }
188
189 # This is hack to work around perl shooting stuff in random
190 # order on exit(). If we do not walk the remaining storage
191 # objects in an END block, there is a *small but real* chance
192 # of a fork()ed child to kill the parent's shared DBI handle,
193 # *before perl reaches the DESTROY in this package*
194 # Yes, it is ugly and effective.
195 # Additionally this registry is used by the CLONE method to
196 # make sure no handles are shared between threads
197 {
198   my %seek_and_destroy;
199
200   sub _arm_global_destructor {
201     my $self = shift;
202     my $key = refaddr ($self);
203     $seek_and_destroy{$key} = $self;
204     weaken ($seek_and_destroy{$key});
205   }
206
207   END {
208     local $?; # just in case the DBI destructor changes it somehow
209
210     # destroy just the object if not native to this process
211     $_->_verify_pid for (grep
212       { defined $_ }
213       values %seek_and_destroy
214     );
215   }
216
217   sub CLONE {
218     # As per DBI's recommendation, DBIC disconnects all handles as
219     # soon as possible (DBIC will reconnect only on demand from within
220     # the thread)
221     for (values %seek_and_destroy) {
222       next unless $_;
223       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
224       $_->_dbh(undef);
225
226       $_->transaction_depth(0);
227       $_->savepoints([]);
228     }
229   }
230 }
231
232 sub DESTROY {
233   my $self = shift;
234
235   # some databases spew warnings on implicit disconnect
236   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
237   local $SIG{__WARN__} = sub {};
238   $self->_dbh(undef);
239
240   # this op is necessary, since the very last perl runtime statement
241   # triggers a global destruction shootout, and the $SIG localization
242   # may very well be destroyed before perl actually gets to do the
243   # $dbh undef
244   1;
245 }
246
247 # handle pid changes correctly - do not destroy parent's connection
248 sub _verify_pid {
249   my $self = shift;
250
251   my $pid = $self->_conn_pid;
252   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
253     $dbh->{InactiveDestroy} = 1;
254     $self->{_dbh_gen}++;
255     $self->_dbh(undef);
256     $self->transaction_depth(0);
257     $self->savepoints([]);
258   }
259
260   return;
261 }
262
263 =head2 connect_info
264
265 This method is normally called by L<DBIx::Class::Schema/connection>, which
266 encapsulates its argument list in an arrayref before passing them here.
267
268 The argument list may contain:
269
270 =over
271
272 =item *
273
274 The same 4-element argument set one would normally pass to
275 L<DBI/connect>, optionally followed by
276 L<extra attributes|/DBIx::Class specific connection attributes>
277 recognized by DBIx::Class:
278
279   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
280
281 =item *
282
283 A single code reference which returns a connected
284 L<DBI database handle|DBI/connect> optionally followed by
285 L<extra attributes|/DBIx::Class specific connection attributes> recognized
286 by DBIx::Class:
287
288   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
289
290 =item *
291
292 A single hashref with all the attributes and the dsn/user/password
293 mixed together:
294
295   $connect_info_args = [{
296     dsn => $dsn,
297     user => $user,
298     password => $pass,
299     %dbi_attributes,
300     %extra_attributes,
301   }];
302
303   $connect_info_args = [{
304     dbh_maker => sub { DBI->connect (...) },
305     %dbi_attributes,
306     %extra_attributes,
307   }];
308
309 This is particularly useful for L<Catalyst> based applications, allowing the
310 following config (L<Config::General> style):
311
312   <Model::DB>
313     schema_class   App::DB
314     <connect_info>
315       dsn          dbi:mysql:database=test
316       user         testuser
317       password     TestPass
318       AutoCommit   1
319     </connect_info>
320   </Model::DB>
321
322 The C<dsn>/C<user>/C<password> combination can be substituted by the
323 C<dbh_maker> key whose value is a coderef that returns a connected
324 L<DBI database handle|DBI/connect>
325
326 =back
327
328 Please note that the L<DBI> docs recommend that you always explicitly
329 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
330 recommends that it be set to I<1>, and that you perform transactions
331 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
332 to I<1> if you do not do explicitly set it to zero.  This is the default
333 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
334
335 =head3 DBIx::Class specific connection attributes
336
337 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
338 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
339 the following connection options. These options can be mixed in with your other
340 L<DBI> connection attributes, or placed in a separate hashref
341 (C<\%extra_attributes>) as shown above.
342
343 Every time C<connect_info> is invoked, any previous settings for
344 these options will be cleared before setting the new ones, regardless of
345 whether any options are specified in the new C<connect_info>.
346
347
348 =over
349
350 =item on_connect_do
351
352 Specifies things to do immediately after connecting or re-connecting to
353 the database.  Its value may contain:
354
355 =over
356
357 =item a scalar
358
359 This contains one SQL statement to execute.
360
361 =item an array reference
362
363 This contains SQL statements to execute in order.  Each element contains
364 a string or a code reference that returns a string.
365
366 =item a code reference
367
368 This contains some code to execute.  Unlike code references within an
369 array reference, its return value is ignored.
370
371 =back
372
373 =item on_disconnect_do
374
375 Takes arguments in the same form as L</on_connect_do> and executes them
376 immediately before disconnecting from the database.
377
378 Note, this only runs if you explicitly call L</disconnect> on the
379 storage object.
380
381 =item on_connect_call
382
383 A more generalized form of L</on_connect_do> that calls the specified
384 C<connect_call_METHOD> methods in your storage driver.
385
386   on_connect_do => 'select 1'
387
388 is equivalent to:
389
390   on_connect_call => [ [ do_sql => 'select 1' ] ]
391
392 Its values may contain:
393
394 =over
395
396 =item a scalar
397
398 Will call the C<connect_call_METHOD> method.
399
400 =item a code reference
401
402 Will execute C<< $code->($storage) >>
403
404 =item an array reference
405
406 Each value can be a method name or code reference.
407
408 =item an array of arrays
409
410 For each array, the first item is taken to be the C<connect_call_> method name
411 or code reference, and the rest are parameters to it.
412
413 =back
414
415 Some predefined storage methods you may use:
416
417 =over
418
419 =item do_sql
420
421 Executes a SQL string or a code reference that returns a SQL string. This is
422 what L</on_connect_do> and L</on_disconnect_do> use.
423
424 It can take:
425
426 =over
427
428 =item a scalar
429
430 Will execute the scalar as SQL.
431
432 =item an arrayref
433
434 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
435 attributes hashref and bind values.
436
437 =item a code reference
438
439 Will execute C<< $code->($storage) >> and execute the return array refs as
440 above.
441
442 =back
443
444 =item datetime_setup
445
446 Execute any statements necessary to initialize the database session to return
447 and accept datetime/timestamp values used with
448 L<DBIx::Class::InflateColumn::DateTime>.
449
450 Only necessary for some databases, see your specific storage driver for
451 implementation details.
452
453 =back
454
455 =item on_disconnect_call
456
457 Takes arguments in the same form as L</on_connect_call> and executes them
458 immediately before disconnecting from the database.
459
460 Calls the C<disconnect_call_METHOD> methods as opposed to the
461 C<connect_call_METHOD> methods called by L</on_connect_call>.
462
463 Note, this only runs if you explicitly call L</disconnect> on the
464 storage object.
465
466 =item disable_sth_caching
467
468 If set to a true value, this option will disable the caching of
469 statement handles via L<DBI/prepare_cached>.
470
471 =item limit_dialect
472
473 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
474 default L</sql_limit_dialect> setting of the storage (if any). For a list
475 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
476
477 =item quote_names
478
479 When true automatically sets L</quote_char> and L</name_sep> to the characters
480 appropriate for your particular RDBMS. This option is preferred over specifying
481 L</quote_char> directly.
482
483 =item quote_char
484
485 Specifies what characters to use to quote table and column names.
486
487 C<quote_char> expects either a single character, in which case is it
488 is placed on either side of the table/column name, or an arrayref of length
489 2 in which case the table/column name is placed between the elements.
490
491 For example under MySQL you should use C<< quote_char => '`' >>, and for
492 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
493
494 =item name_sep
495
496 This parameter is only useful in conjunction with C<quote_char>, and is used to
497 specify the character that separates elements (schemas, tables, columns) from
498 each other. If unspecified it defaults to the most commonly used C<.>.
499
500 =item unsafe
501
502 This Storage driver normally installs its own C<HandleError>, sets
503 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
504 all database handles, including those supplied by a coderef.  It does this
505 so that it can have consistent and useful error behavior.
506
507 If you set this option to a true value, Storage will not do its usual
508 modifications to the database handle's attributes, and instead relies on
509 the settings in your connect_info DBI options (or the values you set in
510 your connection coderef, in the case that you are connecting via coderef).
511
512 Note that your custom settings can cause Storage to malfunction,
513 especially if you set a C<HandleError> handler that suppresses exceptions
514 and/or disable C<RaiseError>.
515
516 =item auto_savepoint
517
518 If this option is true, L<DBIx::Class> will use savepoints when nesting
519 transactions, making it possible to recover from failure in the inner
520 transaction without having to abort all outer transactions.
521
522 =item cursor_class
523
524 Use this argument to supply a cursor class other than the default
525 L<DBIx::Class::Storage::DBI::Cursor>.
526
527 =back
528
529 Some real-life examples of arguments to L</connect_info> and
530 L<DBIx::Class::Schema/connect>
531
532   # Simple SQLite connection
533   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
534
535   # Connect via subref
536   ->connect_info([ sub { DBI->connect(...) } ]);
537
538   # Connect via subref in hashref
539   ->connect_info([{
540     dbh_maker => sub { DBI->connect(...) },
541     on_connect_do => 'alter session ...',
542   }]);
543
544   # A bit more complicated
545   ->connect_info(
546     [
547       'dbi:Pg:dbname=foo',
548       'postgres',
549       'my_pg_password',
550       { AutoCommit => 1 },
551       { quote_char => q{"} },
552     ]
553   );
554
555   # Equivalent to the previous example
556   ->connect_info(
557     [
558       'dbi:Pg:dbname=foo',
559       'postgres',
560       'my_pg_password',
561       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
562     ]
563   );
564
565   # Same, but with hashref as argument
566   # See parse_connect_info for explanation
567   ->connect_info(
568     [{
569       dsn         => 'dbi:Pg:dbname=foo',
570       user        => 'postgres',
571       password    => 'my_pg_password',
572       AutoCommit  => 1,
573       quote_char  => q{"},
574       name_sep    => q{.},
575     }]
576   );
577
578   # Subref + DBIx::Class-specific connection options
579   ->connect_info(
580     [
581       sub { DBI->connect(...) },
582       {
583           quote_char => q{`},
584           name_sep => q{@},
585           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
586           disable_sth_caching => 1,
587       },
588     ]
589   );
590
591
592
593 =cut
594
595 sub connect_info {
596   my ($self, $info) = @_;
597
598   return $self->_connect_info if !$info;
599
600   $self->_connect_info($info); # copy for _connect_info
601
602   $info = $self->_normalize_connect_info($info)
603     if ref $info eq 'ARRAY';
604
605   for my $storage_opt (keys %{ $info->{storage_options} }) {
606     my $value = $info->{storage_options}{$storage_opt};
607
608     $self->$storage_opt($value);
609   }
610
611   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
612   #  the new set of options
613   $self->_sql_maker(undef);
614   $self->_sql_maker_opts({});
615
616   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
617     my $value = $info->{sql_maker_options}{$sql_maker_opt};
618
619     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
620   }
621
622   my %attrs = (
623     %{ $self->_default_dbi_connect_attributes || {} },
624     %{ $info->{attributes} || {} },
625   );
626
627   my @args = @{ $info->{arguments} };
628
629   if (keys %attrs and ref $args[0] ne 'CODE') {
630     carp
631         'You provided explicit AutoCommit => 0 in your connection_info. '
632       . 'This is almost universally a bad idea (see the footnotes of '
633       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
634       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
635       . 'this warning.'
636       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
637
638     push @args, \%attrs if keys %attrs;
639   }
640   $self->_dbi_connect_info(\@args);
641
642   # FIXME - dirty:
643   # save attributes them in a separate accessor so they are always
644   # introspectable, even in case of a CODE $dbhmaker
645   $self->_dbic_connect_attributes (\%attrs);
646
647   return $self->_connect_info;
648 }
649
650 sub _normalize_connect_info {
651   my ($self, $info_arg) = @_;
652   my %info;
653
654   my @args = @$info_arg;  # take a shallow copy for further mutilation
655
656   # combine/pre-parse arguments depending on invocation style
657
658   my %attrs;
659   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
660     %attrs = %{ $args[1] || {} };
661     @args = $args[0];
662   }
663   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
664     %attrs = %{$args[0]};
665     @args = ();
666     if (my $code = delete $attrs{dbh_maker}) {
667       @args = $code;
668
669       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
670       if (@ignored) {
671         carp sprintf (
672             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
673           . "to the result of 'dbh_maker'",
674
675           join (', ', map { "'$_'" } (@ignored) ),
676         );
677       }
678     }
679     else {
680       @args = delete @attrs{qw/dsn user password/};
681     }
682   }
683   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
684     %attrs = (
685       % { $args[3] || {} },
686       % { $args[4] || {} },
687     );
688     @args = @args[0,1,2];
689   }
690
691   $info{arguments} = \@args;
692
693   my @storage_opts = grep exists $attrs{$_},
694     @storage_options, 'cursor_class';
695
696   @{ $info{storage_options} }{@storage_opts} =
697     delete @attrs{@storage_opts} if @storage_opts;
698
699   my @sql_maker_opts = grep exists $attrs{$_},
700     qw/limit_dialect quote_char name_sep quote_names/;
701
702   @{ $info{sql_maker_options} }{@sql_maker_opts} =
703     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
704
705   $info{attributes} = \%attrs if %attrs;
706
707   return \%info;
708 }
709
710 sub _default_dbi_connect_attributes () {
711   +{
712     AutoCommit => 1,
713     PrintError => 0,
714     RaiseError => 1,
715     ShowErrorStatement => 1,
716   };
717 }
718
719 =head2 on_connect_do
720
721 This method is deprecated in favour of setting via L</connect_info>.
722
723 =cut
724
725 =head2 on_disconnect_do
726
727 This method is deprecated in favour of setting via L</connect_info>.
728
729 =cut
730
731 sub _parse_connect_do {
732   my ($self, $type) = @_;
733
734   my $val = $self->$type;
735   return () if not defined $val;
736
737   my @res;
738
739   if (not ref($val)) {
740     push @res, [ 'do_sql', $val ];
741   } elsif (ref($val) eq 'CODE') {
742     push @res, $val;
743   } elsif (ref($val) eq 'ARRAY') {
744     push @res, map { [ 'do_sql', $_ ] } @$val;
745   } else {
746     $self->throw_exception("Invalid type for $type: ".ref($val));
747   }
748
749   return \@res;
750 }
751
752 =head2 dbh_do
753
754 Arguments: ($subref | $method_name), @extra_coderef_args?
755
756 Execute the given $subref or $method_name using the new exception-based
757 connection management.
758
759 The first two arguments will be the storage object that C<dbh_do> was called
760 on and a database handle to use.  Any additional arguments will be passed
761 verbatim to the called subref as arguments 2 and onwards.
762
763 Using this (instead of $self->_dbh or $self->dbh) ensures correct
764 exception handling and reconnection (or failover in future subclasses).
765
766 Your subref should have no side-effects outside of the database, as
767 there is the potential for your subref to be partially double-executed
768 if the database connection was stale/dysfunctional.
769
770 Example:
771
772   my @stuff = $schema->storage->dbh_do(
773     sub {
774       my ($storage, $dbh, @cols) = @_;
775       my $cols = join(q{, }, @cols);
776       $dbh->selectrow_array("SELECT $cols FROM foo");
777     },
778     @column_list
779   );
780
781 =cut
782
783 sub dbh_do {
784   my $self = shift;
785   my $run_target = shift;
786
787   # short circuit when we know there is no need for a runner
788   #
789   # FIXME - asumption may be wrong
790   # the rationale for the txn_depth check is that if this block is a part
791   # of a larger transaction, everything up to that point is screwed anyway
792   return $self->$run_target($self->_get_dbh, @_)
793     if $self->{_in_do_block} or $self->transaction_depth;
794
795   my $args = \@_;
796
797   DBIx::Class::Storage::BlockRunner->new(
798     storage => $self,
799     run_code => sub { $self->$run_target ($self->_get_dbh, @$args ) },
800     wrap_txn => 0,
801     retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) },
802   )->run;
803 }
804
805 sub txn_do {
806   $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
807   shift->next::method(@_);
808 }
809
810 =head2 disconnect
811
812 Our C<disconnect> method also performs a rollback first if the
813 database is not in C<AutoCommit> mode.
814
815 =cut
816
817 sub disconnect {
818   my ($self) = @_;
819
820   if( $self->_dbh ) {
821     my @actions;
822
823     push @actions, ( $self->on_disconnect_call || () );
824     push @actions, $self->_parse_connect_do ('on_disconnect_do');
825
826     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
827
828     # stops the "implicit rollback on disconnect" warning
829     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
830
831     %{ $self->_dbh->{CachedKids} } = ();
832     $self->_dbh->disconnect;
833     $self->_dbh(undef);
834     $self->{_dbh_gen}++;
835   }
836 }
837
838 =head2 with_deferred_fk_checks
839
840 =over 4
841
842 =item Arguments: C<$coderef>
843
844 =item Return Value: The return value of $coderef
845
846 =back
847
848 Storage specific method to run the code ref with FK checks deferred or
849 in MySQL's case disabled entirely.
850
851 =cut
852
853 # Storage subclasses should override this
854 sub with_deferred_fk_checks {
855   my ($self, $sub) = @_;
856   $sub->();
857 }
858
859 =head2 connected
860
861 =over
862
863 =item Arguments: none
864
865 =item Return Value: 1|0
866
867 =back
868
869 Verifies that the current database handle is active and ready to execute
870 an SQL statement (e.g. the connection did not get stale, server is still
871 answering, etc.) This method is used internally by L</dbh>.
872
873 =cut
874
875 sub connected {
876   my $self = shift;
877   return 0 unless $self->_seems_connected;
878
879   #be on the safe side
880   local $self->_dbh->{RaiseError} = 1;
881
882   return $self->_ping;
883 }
884
885 sub _seems_connected {
886   my $self = shift;
887
888   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
889
890   my $dbh = $self->_dbh
891     or return 0;
892
893   return $dbh->FETCH('Active');
894 }
895
896 sub _ping {
897   my $self = shift;
898
899   my $dbh = $self->_dbh or return 0;
900
901   return $dbh->ping;
902 }
903
904 sub ensure_connected {
905   my ($self) = @_;
906
907   unless ($self->connected) {
908     $self->_populate_dbh;
909   }
910 }
911
912 =head2 dbh
913
914 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
915 is guaranteed to be healthy by implicitly calling L</connected>, and if
916 necessary performing a reconnection before returning. Keep in mind that this
917 is very B<expensive> on some database engines. Consider using L</dbh_do>
918 instead.
919
920 =cut
921
922 sub dbh {
923   my ($self) = @_;
924
925   if (not $self->_dbh) {
926     $self->_populate_dbh;
927   } else {
928     $self->ensure_connected;
929   }
930   return $self->_dbh;
931 }
932
933 # this is the internal "get dbh or connect (don't check)" method
934 sub _get_dbh {
935   my $self = shift;
936   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
937   $self->_populate_dbh unless $self->_dbh;
938   return $self->_dbh;
939 }
940
941 sub sql_maker {
942   my ($self) = @_;
943   unless ($self->_sql_maker) {
944     my $sql_maker_class = $self->sql_maker_class;
945
946     my %opts = %{$self->_sql_maker_opts||{}};
947     my $dialect =
948       $opts{limit_dialect}
949         ||
950       $self->sql_limit_dialect
951         ||
952       do {
953         my $s_class = (ref $self) || $self;
954         carp (
955           "Your storage class ($s_class) does not set sql_limit_dialect and you "
956         . 'have not supplied an explicit limit_dialect in your connection_info. '
957         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
958         . 'databases but can be (and often is) painfully slow. '
959         . "Please file an RT ticket against '$s_class' ."
960         );
961
962         'GenericSubQ';
963       }
964     ;
965
966     my ($quote_char, $name_sep);
967
968     if ($opts{quote_names}) {
969       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
970         my $s_class = (ref $self) || $self;
971         carp (
972           "You requested 'quote_names' but your storage class ($s_class) does "
973         . 'not explicitly define a default sql_quote_char and you have not '
974         . 'supplied a quote_char as part of your connection_info. DBIC will '
975         .q{default to the ANSI SQL standard quote '"', which works most of }
976         . "the time. Please file an RT ticket against '$s_class'."
977         );
978
979         '"'; # RV
980       };
981
982       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
983     }
984
985     $self->_sql_maker($sql_maker_class->new(
986       bindtype=>'columns',
987       array_datatypes => 1,
988       limit_dialect => $dialect,
989       ($quote_char ? (quote_char => $quote_char) : ()),
990       name_sep => ($name_sep || '.'),
991       %opts,
992     ));
993   }
994   return $self->_sql_maker;
995 }
996
997 # nothing to do by default
998 sub _rebless {}
999 sub _init {}
1000
1001 sub _populate_dbh {
1002   my ($self) = @_;
1003
1004   my @info = @{$self->_dbi_connect_info || []};
1005   $self->_dbh(undef); # in case ->connected failed we might get sent here
1006   $self->_dbh_details({}); # reset everything we know
1007
1008   $self->_dbh($self->_connect(@info));
1009
1010   $self->_conn_pid($$) unless DBIx::Class::_ENV_::BROKEN_FORK; # on win32 these are in fact threads
1011
1012   $self->_determine_driver;
1013
1014   # Always set the transaction depth on connect, since
1015   #  there is no transaction in progress by definition
1016   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1017
1018   $self->_run_connection_actions unless $self->{_in_determine_driver};
1019 }
1020
1021 sub _run_connection_actions {
1022   my $self = shift;
1023   my @actions;
1024
1025   push @actions, ( $self->on_connect_call || () );
1026   push @actions, $self->_parse_connect_do ('on_connect_do');
1027
1028   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1029 }
1030
1031
1032
1033 sub set_use_dbms_capability {
1034   $_[0]->set_inherited ($_[1], $_[2]);
1035 }
1036
1037 sub get_use_dbms_capability {
1038   my ($self, $capname) = @_;
1039
1040   my $use = $self->get_inherited ($capname);
1041   return defined $use
1042     ? $use
1043     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1044   ;
1045 }
1046
1047 sub set_dbms_capability {
1048   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1049 }
1050
1051 sub get_dbms_capability {
1052   my ($self, $capname) = @_;
1053
1054   my $cap = $self->_dbh_details->{capability}{$capname};
1055
1056   unless (defined $cap) {
1057     if (my $meth = $self->can ("_determine$capname")) {
1058       $cap = $self->$meth ? 1 : 0;
1059     }
1060     else {
1061       $cap = 0;
1062     }
1063
1064     $self->set_dbms_capability ($capname, $cap);
1065   }
1066
1067   return $cap;
1068 }
1069
1070 sub _server_info {
1071   my $self = shift;
1072
1073   my $info;
1074   unless ($info = $self->_dbh_details->{info}) {
1075
1076     $info = {};
1077
1078     my $server_version;
1079     try {
1080       $server_version = $self->_get_server_version;
1081     }
1082     catch {
1083       if ($self->{_in_determine_driver}) {
1084         $self->throw_exception($_);
1085       }
1086       $server_version = undef;
1087     };
1088
1089     if (defined $server_version) {
1090       $info->{dbms_version} = $server_version;
1091
1092       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1093       my @verparts = split (/\./, $numeric_version);
1094       if (
1095         @verparts
1096           &&
1097         $verparts[0] <= 999
1098       ) {
1099         # consider only up to 3 version parts, iff not more than 3 digits
1100         my @use_parts;
1101         while (@verparts && @use_parts < 3) {
1102           my $p = shift @verparts;
1103           last if $p > 999;
1104           push @use_parts, $p;
1105         }
1106         push @use_parts, 0 while @use_parts < 3;
1107
1108         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1109       }
1110     }
1111
1112     $self->_dbh_details->{info} = $info;
1113   }
1114
1115   return $info;
1116 }
1117
1118 sub _get_server_version {
1119   shift->_dbh_get_info('SQL_DBMS_VER');
1120 }
1121
1122 sub _dbh_get_info {
1123   my ($self, $info) = @_;
1124
1125   if ($info =~ /[^0-9]/) {
1126     $info = $DBI::Const::GetInfoType::GetInfoType{$info};
1127     $self->throw_exception("Info type '$_[1]' not provided by DBI::Const::GetInfoType")
1128       unless defined $info;
1129   }
1130
1131   my $res;
1132
1133   try {
1134     $res = $self->_get_dbh->get_info($info);
1135   }
1136   catch {
1137     if ($self->{_in_determine_driver}) {
1138       $self->throw_exception($_);
1139     }
1140     $res = undef;
1141   };
1142
1143   return $res;
1144 }
1145
1146 sub _describe_connection {
1147   require DBI::Const::GetInfoReturn;
1148
1149   my $self = shift;
1150   $self->ensure_connected;
1151
1152   my $res = {
1153     DBIC_DSN => $self->_dbi_connect_info->[0],
1154     DBI_VER => DBI->VERSION,
1155     DBIC_VER => DBIx::Class->VERSION,
1156     DBIC_DRIVER => ref $self,
1157   };
1158
1159   for my $inf (
1160     #keys %DBI::Const::GetInfoType::GetInfoType,
1161     qw/
1162       SQL_CURSOR_COMMIT_BEHAVIOR
1163       SQL_CURSOR_ROLLBACK_BEHAVIOR
1164       SQL_CURSOR_SENSITIVITY
1165       SQL_DATA_SOURCE_NAME
1166       SQL_DBMS_NAME
1167       SQL_DBMS_VER
1168       SQL_DEFAULT_TXN_ISOLATION
1169       SQL_DM_VER
1170       SQL_DRIVER_NAME
1171       SQL_DRIVER_ODBC_VER
1172       SQL_DRIVER_VER
1173       SQL_EXPRESSIONS_IN_ORDERBY
1174       SQL_GROUP_BY
1175       SQL_IDENTIFIER_CASE
1176       SQL_IDENTIFIER_QUOTE_CHAR
1177       SQL_MAX_CATALOG_NAME_LEN
1178       SQL_MAX_COLUMN_NAME_LEN
1179       SQL_MAX_IDENTIFIER_LEN
1180       SQL_MAX_TABLE_NAME_LEN
1181       SQL_MULTIPLE_ACTIVE_TXN
1182       SQL_MULT_RESULT_SETS
1183       SQL_NEED_LONG_DATA_LEN
1184       SQL_NON_NULLABLE_COLUMNS
1185       SQL_ODBC_VER
1186       SQL_QUALIFIER_NAME_SEPARATOR
1187       SQL_QUOTED_IDENTIFIER_CASE
1188       SQL_TXN_CAPABLE
1189       SQL_TXN_ISOLATION_OPTION
1190     /
1191   ) {
1192     my $v = $self->_dbh_get_info($inf);
1193     next unless defined $v;
1194
1195     #my $key = sprintf( '%s(%s)', $inf, $DBI::Const::GetInfoType::GetInfoType{$inf} );
1196     my $expl = DBI::Const::GetInfoReturn::Explain($inf, $v);
1197     $res->{$inf} = DBI::Const::GetInfoReturn::Format($inf, $v) . ( $expl ? " ($expl)" : '' );
1198   }
1199
1200   $res;
1201 }
1202
1203 sub _determine_driver {
1204   my ($self) = @_;
1205
1206   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1207     my $started_connected = 0;
1208     local $self->{_in_determine_driver} = 1;
1209
1210     if (ref($self) eq __PACKAGE__) {
1211       my $driver;
1212       if ($self->_dbh) { # we are connected
1213         $driver = $self->_dbh->{Driver}{Name};
1214         $started_connected = 1;
1215       }
1216       else {
1217         # if connect_info is a CODEREF, we have no choice but to connect
1218         if (ref $self->_dbi_connect_info->[0] &&
1219             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1220           $self->_populate_dbh;
1221           $driver = $self->_dbh->{Driver}{Name};
1222         }
1223         else {
1224           # try to use dsn to not require being connected, the driver may still
1225           # force a connection in _rebless to determine version
1226           # (dsn may not be supplied at all if all we do is make a mock-schema)
1227           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1228           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1229           $driver ||= $ENV{DBI_DRIVER};
1230         }
1231       }
1232
1233       if ($driver) {
1234         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1235         if ($self->load_optional_class($storage_class)) {
1236           mro::set_mro($storage_class, 'c3');
1237           bless $self, $storage_class;
1238           $self->_rebless();
1239         }
1240         else {
1241           $self->_warn_undetermined_driver(
1242             'This version of DBIC does not yet seem to supply a driver for '
1243           . "your particular RDBMS and/or connection method ('$driver')."
1244           );
1245         }
1246       }
1247       else {
1248         $self->_warn_undetermined_driver(
1249           'Unable to extract a driver name from connect info - this '
1250         . 'should not have happened.'
1251         );
1252       }
1253     }
1254
1255     $self->_driver_determined(1);
1256
1257     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1258
1259     $self->_init; # run driver-specific initializations
1260
1261     $self->_run_connection_actions
1262         if !$started_connected && defined $self->_dbh;
1263   }
1264 }
1265
1266 sub _determine_connector_driver {
1267   my ($self, $conn) = @_;
1268
1269   my $dbtype = $self->_dbh_get_info('SQL_DBMS_NAME');
1270
1271   if (not $dbtype) {
1272     $self->_warn_undetermined_driver(
1273       'Unable to retrieve RDBMS type (SQL_DBMS_NAME) of the engine behind your '
1274     . "$conn connector - this should not have happened."
1275     );
1276     return;
1277   }
1278
1279   $dbtype =~ s/\W/_/gi;
1280
1281   my $subclass = "DBIx::Class::Storage::DBI::${conn}::${dbtype}";
1282   return if $self->isa($subclass);
1283
1284   if ($self->load_optional_class($subclass)) {
1285     bless $self, $subclass;
1286     $self->_rebless;
1287   }
1288   else {
1289     $self->_warn_undetermined_driver(
1290       'This version of DBIC does not yet seem to supply a driver for '
1291     . "your particular RDBMS and/or connection method ('$conn/$dbtype')."
1292     );
1293   }
1294 }
1295
1296 sub _warn_undetermined_driver {
1297   my ($self, $msg) = @_;
1298
1299   require Data::Dumper::Concise;
1300
1301   carp_once ($msg . ' While we will attempt to continue anyway, the results '
1302   . 'are likely to be underwhelming. Please upgrade DBIC, and if this message '
1303   . "does not go away, file a bugreport including the following info:\n"
1304   . Data::Dumper::Concise::Dumper($self->_describe_connection)
1305   );
1306 }
1307
1308 sub _do_connection_actions {
1309   my $self          = shift;
1310   my $method_prefix = shift;
1311   my $call          = shift;
1312
1313   if (not ref($call)) {
1314     my $method = $method_prefix . $call;
1315     $self->$method(@_);
1316   } elsif (ref($call) eq 'CODE') {
1317     $self->$call(@_);
1318   } elsif (ref($call) eq 'ARRAY') {
1319     if (ref($call->[0]) ne 'ARRAY') {
1320       $self->_do_connection_actions($method_prefix, $_) for @$call;
1321     } else {
1322       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1323     }
1324   } else {
1325     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1326   }
1327
1328   return $self;
1329 }
1330
1331 sub connect_call_do_sql {
1332   my $self = shift;
1333   $self->_do_query(@_);
1334 }
1335
1336 sub disconnect_call_do_sql {
1337   my $self = shift;
1338   $self->_do_query(@_);
1339 }
1340
1341 # override in db-specific backend when necessary
1342 sub connect_call_datetime_setup { 1 }
1343
1344 sub _do_query {
1345   my ($self, $action) = @_;
1346
1347   if (ref $action eq 'CODE') {
1348     $action = $action->($self);
1349     $self->_do_query($_) foreach @$action;
1350   }
1351   else {
1352     # Most debuggers expect ($sql, @bind), so we need to exclude
1353     # the attribute hash which is the second argument to $dbh->do
1354     # furthermore the bind values are usually to be presented
1355     # as named arrayref pairs, so wrap those here too
1356     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1357     my $sql = shift @do_args;
1358     my $attrs = shift @do_args;
1359     my @bind = map { [ undef, $_ ] } @do_args;
1360
1361     $self->dbh_do(sub {
1362       $_[0]->_query_start($sql, \@bind);
1363       $_[1]->do($sql, $attrs, @do_args);
1364       $_[0]->_query_end($sql, \@bind);
1365     });
1366   }
1367
1368   return $self;
1369 }
1370
1371 sub _connect {
1372   my ($self, @info) = @_;
1373
1374   $self->throw_exception("You failed to provide any connection info")
1375     if !@info;
1376
1377   my ($old_connect_via, $dbh);
1378
1379   local $DBI::connect_via = 'connect' if $INC{'Apache/DBI.pm'} && $ENV{MOD_PERL};
1380
1381   try {
1382     if(ref $info[0] eq 'CODE') {
1383       $dbh = $info[0]->();
1384     }
1385     else {
1386       require DBI;
1387       $dbh = DBI->connect(@info);
1388     }
1389
1390     if (!$dbh) {
1391       die $DBI::errstr;
1392     }
1393
1394     unless ($self->unsafe) {
1395
1396       $self->throw_exception(
1397         'Refusing clobbering of {HandleError} installed on externally supplied '
1398        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1399       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1400
1401       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1402       # request, or an external handle. Complain and set anyway
1403       unless ($dbh->{RaiseError}) {
1404         carp( ref $info[0] eq 'CODE'
1405
1406           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1407            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1408            .'attribute has been supplied'
1409
1410           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1411            .'unsafe => 1. Toggling RaiseError back to true'
1412         );
1413
1414         $dbh->{RaiseError} = 1;
1415       }
1416
1417       # this odd anonymous coderef dereference is in fact really
1418       # necessary to avoid the unwanted effect described in perl5
1419       # RT#75792
1420       sub {
1421         my $weak_self = $_[0];
1422         weaken $weak_self;
1423
1424         # the coderef is blessed so we can distinguish it from externally
1425         # supplied handles (which must be preserved)
1426         $_[1]->{HandleError} = bless sub {
1427           if ($weak_self) {
1428             $weak_self->throw_exception("DBI Exception: $_[0]");
1429           }
1430           else {
1431             # the handler may be invoked by something totally out of
1432             # the scope of DBIC
1433             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1434           }
1435         }, '__DBIC__DBH__ERROR__HANDLER__';
1436       }->($self, $dbh);
1437     }
1438   }
1439   catch {
1440     $self->throw_exception("DBI Connection failed: $_")
1441   };
1442
1443   $self->_dbh_autocommit($dbh->{AutoCommit});
1444   $dbh;
1445 }
1446
1447 sub txn_begin {
1448   my $self = shift;
1449
1450   # this means we have not yet connected and do not know the AC status
1451   # (e.g. coderef $dbh), need a full-fledged connection check
1452   if (! defined $self->_dbh_autocommit) {
1453     $self->ensure_connected;
1454   }
1455   # Otherwise simply connect or re-connect on pid changes
1456   else {
1457     $self->_get_dbh;
1458   }
1459
1460   $self->next::method(@_);
1461 }
1462
1463 sub _exec_txn_begin {
1464   my $self = shift;
1465
1466   # if the user is utilizing txn_do - good for him, otherwise we need to
1467   # ensure that the $dbh is healthy on BEGIN.
1468   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1469   # will be replaced by a failure of begin_work itself (which will be
1470   # then retried on reconnect)
1471   if ($self->{_in_do_block}) {
1472     $self->_dbh->begin_work;
1473   } else {
1474     $self->dbh_do(sub { $_[1]->begin_work });
1475   }
1476 }
1477
1478 sub txn_commit {
1479   my $self = shift;
1480
1481   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1482   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1483     unless $self->_dbh;
1484
1485   # esoteric case for folks using external $dbh handles
1486   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1487     carp "Storage transaction_depth 0 does not match "
1488         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1489     $self->transaction_depth(1);
1490   }
1491
1492   $self->next::method(@_);
1493
1494   # if AutoCommit is disabled txn_depth never goes to 0
1495   # as a new txn is started immediately on commit
1496   $self->transaction_depth(1) if (
1497     !$self->transaction_depth
1498       and
1499     defined $self->_dbh_autocommit
1500       and
1501     ! $self->_dbh_autocommit
1502   );
1503 }
1504
1505 sub _exec_txn_commit {
1506   shift->_dbh->commit;
1507 }
1508
1509 sub txn_rollback {
1510   my $self = shift;
1511
1512   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1513   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1514     unless $self->_dbh;
1515
1516   # esoteric case for folks using external $dbh handles
1517   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1518     carp "Storage transaction_depth 0 does not match "
1519         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1520     $self->transaction_depth(1);
1521   }
1522
1523   $self->next::method(@_);
1524
1525   # if AutoCommit is disabled txn_depth never goes to 0
1526   # as a new txn is started immediately on commit
1527   $self->transaction_depth(1) if (
1528     !$self->transaction_depth
1529       and
1530     defined $self->_dbh_autocommit
1531       and
1532     ! $self->_dbh_autocommit
1533   );
1534 }
1535
1536 sub _exec_txn_rollback {
1537   shift->_dbh->rollback;
1538 }
1539
1540 # generate some identical methods
1541 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1542   no strict qw/refs/;
1543   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1544     my $self = shift;
1545     $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1546     $self->throw_exception("Unable to $meth() on a disconnected storage")
1547       unless $self->_dbh;
1548     $self->next::method(@_);
1549   };
1550 }
1551
1552 # This used to be the top-half of _execute.  It was split out to make it
1553 #  easier to override in NoBindVars without duping the rest.  It takes up
1554 #  all of _execute's args, and emits $sql, @bind.
1555 sub _prep_for_execute {
1556   #my ($self, $op, $ident, $args) = @_;
1557   return shift->_gen_sql_bind(@_)
1558 }
1559
1560 sub _gen_sql_bind {
1561   my ($self, $op, $ident, $args) = @_;
1562
1563   my ($sql, @bind) = $self->sql_maker->$op(
1564     blessed($ident) ? $ident->from : $ident,
1565     @$args,
1566   );
1567
1568   if (
1569     ! $ENV{DBIC_DT_SEARCH_OK}
1570       and
1571     $op eq 'select'
1572       and
1573     first { blessed($_->[1]) && $_->[1]->isa('DateTime') } @bind
1574   ) {
1575     carp_unique 'DateTime objects passed to search() are not supported '
1576       . 'properly (InflateColumn::DateTime formats and settings are not '
1577       . 'respected.) See "Formatting DateTime objects in queries" in '
1578       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1579       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1580   }
1581
1582   return( $sql, $self->_resolve_bindattrs(
1583     $ident, [ @{$args->[2]{bind}||[]}, @bind ]
1584   ));
1585 }
1586
1587 sub _resolve_bindattrs {
1588   my ($self, $ident, $bind, $colinfos) = @_;
1589
1590   $colinfos ||= {};
1591
1592   my $resolve_bindinfo = sub {
1593     #my $infohash = shift;
1594
1595     %$colinfos = %{ $self->_resolve_column_info($ident) }
1596       unless keys %$colinfos;
1597
1598     my $ret;
1599     if (my $col = $_[0]->{dbic_colname}) {
1600       $ret = { %{$_[0]} };
1601
1602       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1603         if $colinfos->{$col}{data_type};
1604
1605       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1606         if $colinfos->{$col}{size};
1607     }
1608
1609     $ret || $_[0];
1610   };
1611
1612   return [ map {
1613     if (ref $_ ne 'ARRAY') {
1614       [{}, $_]
1615     }
1616     elsif (! defined $_->[0]) {
1617       [{}, $_->[1]]
1618     }
1619     elsif (ref $_->[0] eq 'HASH') {
1620       [
1621         ($_->[0]{dbd_attrs} or $_->[0]{sqlt_datatype}) ? $_->[0] : $resolve_bindinfo->($_->[0]),
1622         $_->[1]
1623       ]
1624     }
1625     elsif (ref $_->[0] eq 'SCALAR') {
1626       [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1627     }
1628     else {
1629       [ $resolve_bindinfo->({ dbic_colname => $_->[0] }), $_->[1] ]
1630     }
1631   } @$bind ];
1632 }
1633
1634 sub _format_for_trace {
1635   #my ($self, $bind) = @_;
1636
1637   ### Turn @bind from something like this:
1638   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1639   ### to this:
1640   ###   ( "'1'", "'3'" )
1641
1642   map {
1643     defined( $_ && $_->[1] )
1644       ? qq{'$_->[1]'}
1645       : q{NULL}
1646   } @{$_[1] || []};
1647 }
1648
1649 sub _query_start {
1650   my ( $self, $sql, $bind ) = @_;
1651
1652   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1653     if $self->debug;
1654 }
1655
1656 sub _query_end {
1657   my ( $self, $sql, $bind ) = @_;
1658
1659   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1660     if $self->debug;
1661 }
1662
1663 my $sba_compat;
1664 sub _dbi_attrs_for_bind {
1665   my ($self, $ident, $bind) = @_;
1666
1667   if (! defined $sba_compat) {
1668     $self->_determine_driver;
1669     $sba_compat = $self->can('source_bind_attributes') == \&source_bind_attributes
1670       ? 0
1671       : 1
1672     ;
1673   }
1674
1675   my $sba_attrs;
1676   if ($sba_compat) {
1677     my $class = ref $self;
1678     carp_unique (
1679       "The source_bind_attributes() override in $class relies on a deprecated codepath. "
1680      .'You are strongly advised to switch your code to override bind_attribute_by_datatype() '
1681      .'instead. This legacy compat shim will also disappear some time before DBIC 0.09'
1682     );
1683
1684     my $sba_attrs = $self->source_bind_attributes
1685   }
1686
1687   my @attrs;
1688
1689   for (map { $_->[0] } @$bind) {
1690     push @attrs, do {
1691       if (exists $_->{dbd_attrs}) {
1692         $_->{dbd_attrs}
1693       }
1694       elsif($_->{sqlt_datatype}) {
1695         # cache the result in the dbh_details hash, as it can not change unless
1696         # we connect to something else
1697         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1698         if (not exists $cache->{$_->{sqlt_datatype}}) {
1699           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1700         }
1701         $cache->{$_->{sqlt_datatype}};
1702       }
1703       elsif ($sba_attrs and $_->{dbic_colname}) {
1704         $sba_attrs->{$_->{dbic_colname}} || undef;
1705       }
1706       else {
1707         undef;  # always push something at this position
1708       }
1709     }
1710   }
1711
1712   return \@attrs;
1713 }
1714
1715 sub _execute {
1716   my ($self, $op, $ident, @args) = @_;
1717
1718   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1719
1720   shift->dbh_do(    # retry over disconnects
1721     '_dbh_execute',
1722     $sql,
1723     $bind,
1724     $self->_dbi_attrs_for_bind($ident, $bind)
1725   );
1726 }
1727
1728 sub _dbh_execute {
1729   my ($self, undef, $sql, $bind, $bind_attrs) = @_;
1730
1731   $self->_query_start( $sql, $bind );
1732   my $sth = $self->_sth($sql);
1733
1734   for my $i (0 .. $#$bind) {
1735     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1736       $sth->bind_param_inout(
1737         $i + 1, # bind params counts are 1-based
1738         $bind->[$i][1],
1739         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1740         $bind_attrs->[$i],
1741       );
1742     }
1743     else {
1744       $sth->bind_param(
1745         $i + 1,
1746         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1747           ? "$bind->[$i][1]"
1748           : $bind->[$i][1]
1749         ,
1750         $bind_attrs->[$i],
1751       );
1752     }
1753   }
1754
1755   # Can this fail without throwing an exception anyways???
1756   my $rv = $sth->execute();
1757   $self->throw_exception(
1758     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1759   ) if !$rv;
1760
1761   $self->_query_end( $sql, $bind );
1762
1763   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1764 }
1765
1766 sub _prefetch_autovalues {
1767   my ($self, $source, $to_insert) = @_;
1768
1769   my $colinfo = $source->columns_info;
1770
1771   my %values;
1772   for my $col (keys %$colinfo) {
1773     if (
1774       $colinfo->{$col}{auto_nextval}
1775         and
1776       (
1777         ! exists $to_insert->{$col}
1778           or
1779         ref $to_insert->{$col} eq 'SCALAR'
1780           or
1781         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1782       )
1783     ) {
1784       $values{$col} = $self->_sequence_fetch(
1785         'NEXTVAL',
1786         ( $colinfo->{$col}{sequence} ||=
1787             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1788         ),
1789       );
1790     }
1791   }
1792
1793   \%values;
1794 }
1795
1796 sub insert {
1797   my ($self, $source, $to_insert) = @_;
1798
1799   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1800
1801   # fuse the values, but keep a separate list of prefetched_values so that
1802   # they can be fused once again with the final return
1803   $to_insert = { %$to_insert, %$prefetched_values };
1804
1805   # FIXME - we seem to assume undef values as non-supplied. This is wrong.
1806   # Investigate what does it take to s/defined/exists/
1807   my $col_infos = $source->columns_info;
1808   my %pcols = map { $_ => 1 } $source->primary_columns;
1809   my (%retrieve_cols, $autoinc_supplied, $retrieve_autoinc_col);
1810   for my $col ($source->columns) {
1811     if ($col_infos->{$col}{is_auto_increment}) {
1812       $autoinc_supplied ||= 1 if defined $to_insert->{$col};
1813       $retrieve_autoinc_col ||= $col unless $autoinc_supplied;
1814     }
1815
1816     # nothing to retrieve when explicit values are supplied
1817     next if (defined $to_insert->{$col} and ! (
1818       ref $to_insert->{$col} eq 'SCALAR'
1819         or
1820       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1821     ));
1822
1823     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1824     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1825       $pcols{$col}
1826         or
1827       $col_infos->{$col}{retrieve_on_insert}
1828     );
1829   };
1830
1831   local $self->{_autoinc_supplied_for_op} = $autoinc_supplied;
1832   local $self->{_perform_autoinc_retrieval} = $retrieve_autoinc_col;
1833
1834   my ($sqla_opts, @ir_container);
1835   if (%retrieve_cols and $self->_use_insert_returning) {
1836     $sqla_opts->{returning_container} = \@ir_container
1837       if $self->_use_insert_returning_bound;
1838
1839     $sqla_opts->{returning} = [
1840       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1841     ];
1842   }
1843
1844   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1845
1846   my %returned_cols = %$to_insert;
1847   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1848     @ir_container = try {
1849       local $SIG{__WARN__} = sub {};
1850       my @r = $sth->fetchrow_array;
1851       $sth->finish;
1852       @r;
1853     } unless @ir_container;
1854
1855     @returned_cols{@$retlist} = @ir_container if @ir_container;
1856   }
1857   else {
1858     # pull in PK if needed and then everything else
1859     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1860
1861       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1862         unless $self->can('last_insert_id');
1863
1864       my @pri_values = $self->last_insert_id($source, @missing_pri);
1865
1866       $self->throw_exception( "Can't get last insert id" )
1867         unless (@pri_values == @missing_pri);
1868
1869       @returned_cols{@missing_pri} = @pri_values;
1870       delete $retrieve_cols{$_} for @missing_pri;
1871     }
1872
1873     # if there is more left to pull
1874     if (%retrieve_cols) {
1875       $self->throw_exception(
1876         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1877       ) unless %pcols;
1878
1879       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1880
1881       my $cur = DBIx::Class::ResultSet->new($source, {
1882         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1883         select => \@left_to_fetch,
1884       })->cursor;
1885
1886       @returned_cols{@left_to_fetch} = $cur->next;
1887
1888       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1889         if scalar $cur->next;
1890     }
1891   }
1892
1893   return { %$prefetched_values, %returned_cols };
1894 }
1895
1896 sub insert_bulk {
1897   my ($self, $source, $cols, $data) = @_;
1898
1899   my @col_range = (0..$#$cols);
1900
1901   # FIXME - perhaps this is not even needed? does DBI stringify?
1902   #
1903   # forcibly stringify whatever is stringifiable
1904   # ResultSet::populate() hands us a copy - safe to mangle
1905   for my $r (0 .. $#$data) {
1906     for my $c (0 .. $#{$data->[$r]}) {
1907       $data->[$r][$c] = "$data->[$r][$c]"
1908         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1909     }
1910   }
1911
1912   my $colinfos = $source->columns_info($cols);
1913
1914   local $self->{_autoinc_supplied_for_op} =
1915     (first { $_->{is_auto_increment} } values %$colinfos)
1916       ? 1
1917       : 0
1918   ;
1919
1920   # get a slice type index based on first row of data
1921   # a "column" in this context may refer to more than one bind value
1922   # e.g. \[ '?, ?', [...], [...] ]
1923   #
1924   # construct the value type index - a description of values types for every
1925   # per-column slice of $data:
1926   #
1927   # nonexistent - nonbind literal
1928   # 0 - regular value
1929   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
1930   #
1931   # also construct the column hash to pass to the SQL generator. For plain
1932   # (non literal) values - convert the members of the first row into a
1933   # literal+bind combo, with extra positional info in the bind attr hashref.
1934   # This will allow us to match the order properly, and is so contrived
1935   # because a user-supplied literal/bind (or something else specific to a
1936   # resultsource and/or storage driver) can inject extra binds along the
1937   # way, so one can't rely on "shift positions" ordering at all. Also we
1938   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
1939   # can be later matched up by address), because we want to supply a real
1940   # value on which perhaps e.g. datatype checks will be performed
1941   my ($proto_data, $value_type_idx);
1942   for my $i (@col_range) {
1943     my $colname = $cols->[$i];
1944     if (ref $data->[0][$i] eq 'SCALAR') {
1945       # no bind value at all - no type
1946
1947       $proto_data->{$colname} = $data->[0][$i];
1948     }
1949     elsif (ref $data->[0][$i] eq 'REF' and ref ${$data->[0][$i]} eq 'ARRAY' ) {
1950       # repack, so we don't end up mangling the original \[]
1951       my ($sql, @bind) = @${$data->[0][$i]};
1952
1953       # normalization of user supplied stuff
1954       my $resolved_bind = $self->_resolve_bindattrs(
1955         $source, \@bind, $colinfos,
1956       );
1957
1958       # store value-less (attrs only) bind info - we will be comparing all
1959       # supplied binds against this for sanity
1960       $value_type_idx->{$i} = [ map { $_->[0] } @$resolved_bind ];
1961
1962       $proto_data->{$colname} = \[ $sql, map { [
1963         # inject slice order to use for $proto_bind construction
1964           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $i }
1965             =>
1966           $resolved_bind->[$_][1]
1967         ] } (0 .. $#bind)
1968       ];
1969     }
1970     else {
1971       $value_type_idx->{$i} = 0;
1972
1973       $proto_data->{$colname} = \[ '?', [
1974         { dbic_colname => $colname, _bind_data_slice_idx => $i }
1975           =>
1976         $data->[0][$i]
1977       ] ];
1978     }
1979   }
1980
1981   my ($sql, $proto_bind) = $self->_prep_for_execute (
1982     'insert',
1983     $source,
1984     [ $proto_data ],
1985   );
1986
1987   if (! @$proto_bind and keys %$value_type_idx) {
1988     # if the bindlist is empty and we had some dynamic binds, this means the
1989     # storage ate them away (e.g. the NoBindVars component) and interpolated
1990     # them directly into the SQL. This obviously can't be good for multi-inserts
1991     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1992   }
1993
1994   # sanity checks
1995   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
1996   #
1997   # use an error reporting closure for convenience (less to pass)
1998   my $bad_slice_report_cref = sub {
1999     my ($msg, $r_idx, $c_idx) = @_;
2000     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
2001       $msg,
2002       $cols->[$c_idx],
2003       do {
2004         require Data::Dumper::Concise;
2005         local $Data::Dumper::Maxdepth = 5;
2006         Data::Dumper::Concise::Dumper ({
2007           map { $cols->[$_] =>
2008             $data->[$r_idx][$_]
2009           } @col_range
2010         }),
2011       }
2012     );
2013   };
2014
2015   for my $col_idx (@col_range) {
2016     my $reference_val = $data->[0][$col_idx];
2017
2018     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
2019       my $val = $data->[$row_idx][$col_idx];
2020
2021       if (! exists $value_type_idx->{$col_idx}) { # literal no binds
2022         if (ref $val ne 'SCALAR') {
2023           $bad_slice_report_cref->(
2024             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
2025             $row_idx,
2026             $col_idx,
2027           );
2028         }
2029         elsif ($$val ne $$reference_val) {
2030           $bad_slice_report_cref->(
2031             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
2032             $row_idx,
2033             $col_idx,
2034           );
2035         }
2036       }
2037       elsif (! $value_type_idx->{$col_idx} ) {  # regular non-literal value
2038         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
2039           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
2040         }
2041       }
2042       else {  # binds from a \[], compare type and attrs
2043         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
2044           $bad_slice_report_cref->(
2045             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
2046             $row_idx,
2047             $col_idx,
2048           );
2049         }
2050         # start drilling down and bail out early on identical refs
2051         elsif (
2052           $reference_val != $val
2053             or
2054           $$reference_val != $$val
2055         ) {
2056           if (${$val}->[0] ne ${$reference_val}->[0]) {
2057             $bad_slice_report_cref->(
2058               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
2059               $row_idx,
2060               $col_idx,
2061             );
2062           }
2063           # need to check the bind attrs - a bind will happen only once for
2064           # the entire dataset, so any changes further down will be ignored.
2065           elsif (! Data::Compare::Compare(
2066             $value_type_idx->{$col_idx},
2067             [
2068               map
2069               { $_->[0] }
2070               @{$self->_resolve_bindattrs(
2071                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
2072               )}
2073             ],
2074           )) {
2075             $bad_slice_report_cref->(
2076               'Differing bind attributes on literal/bind values not supported',
2077               $row_idx,
2078               $col_idx,
2079             );
2080           }
2081         }
2082       }
2083     }
2084   }
2085
2086   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
2087   # are atomic (even if execute_for_fetch is a single call). Thus a safety
2088   # scope guard
2089   my $guard = $self->txn_scope_guard;
2090
2091   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
2092   my $sth = $self->_sth($sql);
2093   my $rv = do {
2094     if (@$proto_bind) {
2095       # proto bind contains the information on which pieces of $data to pull
2096       # $cols is passed in only for prettier error-reporting
2097       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
2098     }
2099     else {
2100       # bind_param_array doesn't work if there are no binds
2101       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
2102     }
2103   };
2104
2105   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
2106
2107   $guard->commit;
2108
2109   return wantarray ? ($rv, $sth, @$proto_bind) : $rv;
2110 }
2111
2112 # execute_for_fetch is capable of returning data just fine (it means it
2113 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
2114 # is the void-populate fast-path we will just ignore this altogether
2115 # for the time being.
2116 sub _dbh_execute_for_fetch {
2117   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
2118
2119   my @idx_range = ( 0 .. $#$proto_bind );
2120
2121   # If we have any bind attributes to take care of, we will bind the
2122   # proto-bind data (which will never be used by execute_for_fetch)
2123   # However since column bindtypes are "sticky", this is sufficient
2124   # to get the DBD to apply the bindtype to all values later on
2125
2126   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
2127
2128   for my $i (@idx_range) {
2129     $sth->bind_param (
2130       $i+1, # DBI bind indexes are 1-based
2131       $proto_bind->[$i][1],
2132       $bind_attrs->[$i],
2133     ) if defined $bind_attrs->[$i];
2134   }
2135
2136   # At this point $data slots named in the _bind_data_slice_idx of
2137   # each piece of $proto_bind are either \[]s or plain values to be
2138   # passed in. Construct the dispensing coderef. *NOTE* the order
2139   # of $data will differ from this of the ?s in the SQL (due to
2140   # alphabetical ordering by colname). We actually do want to
2141   # preserve this behavior so that prepare_cached has a better
2142   # chance of matching on unrelated calls
2143   my %data_reorder = map { $proto_bind->[$_][0]{_bind_data_slice_idx} => $_ } @idx_range;
2144
2145   my $fetch_row_idx = -1; # saner loop this way
2146   my $fetch_tuple = sub {
2147     return undef if ++$fetch_row_idx > $#$data;
2148
2149     return [ map
2150       { (ref $_ eq 'REF' and ref $$_ eq 'ARRAY')
2151         ? map { $_->[-1] } @{$$_}[1 .. $#$$_]
2152         : $_
2153       }
2154       map
2155         { $data->[$fetch_row_idx][$_]}
2156         sort
2157           { $data_reorder{$a} <=> $data_reorder{$b} }
2158           keys %data_reorder
2159     ];
2160   };
2161
2162   my $tuple_status = [];
2163   my ($rv, $err);
2164   try {
2165     $rv = $sth->execute_for_fetch(
2166       $fetch_tuple,
2167       $tuple_status,
2168     );
2169   }
2170   catch {
2171     $err = shift;
2172   };
2173
2174   # Not all DBDs are create equal. Some throw on error, some return
2175   # an undef $rv, and some set $sth->err - try whatever we can
2176   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2177     ! defined $err
2178       and
2179     ( !defined $rv or $sth->err )
2180   );
2181
2182   # Statement must finish even if there was an exception.
2183   try {
2184     $sth->finish
2185   }
2186   catch {
2187     $err = shift unless defined $err
2188   };
2189
2190   if (defined $err) {
2191     my $i = 0;
2192     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2193
2194     $self->throw_exception("Unexpected populate error: $err")
2195       if ($i > $#$tuple_status);
2196
2197     require Data::Dumper::Concise;
2198     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2199       ($tuple_status->[$i][1] || $err),
2200       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2201     );
2202   }
2203
2204   return $rv;
2205 }
2206
2207 sub _dbh_execute_inserts_with_no_binds {
2208   my ($self, $sth, $count) = @_;
2209
2210   my $err;
2211   try {
2212     my $dbh = $self->_get_dbh;
2213     local $dbh->{RaiseError} = 1;
2214     local $dbh->{PrintError} = 0;
2215
2216     $sth->execute foreach 1..$count;
2217   }
2218   catch {
2219     $err = shift;
2220   };
2221
2222   # Make sure statement is finished even if there was an exception.
2223   try {
2224     $sth->finish
2225   }
2226   catch {
2227     $err = shift unless defined $err;
2228   };
2229
2230   $self->throw_exception($err) if defined $err;
2231
2232   return $count;
2233 }
2234
2235 sub update {
2236   #my ($self, $source, @args) = @_;
2237   shift->_execute('update', @_);
2238 }
2239
2240
2241 sub delete {
2242   #my ($self, $source, @args) = @_;
2243   shift->_execute('delete', @_);
2244 }
2245
2246 sub _select {
2247   my $self = shift;
2248   $self->_execute($self->_select_args(@_));
2249 }
2250
2251 sub _select_args_to_query {
2252   my $self = shift;
2253
2254   $self->throw_exception(
2255     "Unable to generate limited query representation with 'software_limit' enabled"
2256   ) if ($_[3]->{software_limit} and ($_[3]->{offset} or $_[3]->{rows}) );
2257
2258   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2259   #  = $self->_select_args($ident, $select, $cond, $attrs);
2260   my ($op, $ident, @args) =
2261     $self->_select_args(@_);
2262
2263   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2264   my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, \@args);
2265   $prepared_bind ||= [];
2266
2267   return wantarray
2268     ? ($sql, $prepared_bind)
2269     : \[ "($sql)", @$prepared_bind ]
2270   ;
2271 }
2272
2273 sub _select_args {
2274   my ($self, $ident, $select, $where, $attrs) = @_;
2275
2276   my $sql_maker = $self->sql_maker;
2277   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2278
2279   $attrs = {
2280     %$attrs,
2281     select => $select,
2282     from => $ident,
2283     where => $where,
2284     $rs_alias && $alias2source->{$rs_alias}
2285       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2286       : ()
2287     ,
2288   };
2289
2290   # Sanity check the attributes (SQLMaker does it too, but
2291   # in case of a software_limit we'll never reach there)
2292   if (defined $attrs->{offset}) {
2293     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2294       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2295   }
2296
2297   if (defined $attrs->{rows}) {
2298     $self->throw_exception("The rows attribute must be a positive integer if present")
2299       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2300   }
2301   elsif ($attrs->{offset}) {
2302     # MySQL actually recommends this approach.  I cringe.
2303     $attrs->{rows} = $sql_maker->__max_int;
2304   }
2305
2306   my @limit;
2307
2308   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2309   # storage, unless software limit was requested
2310   if (
2311     #limited has_many
2312     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2313        ||
2314     # grouped prefetch (to satisfy group_by == select)
2315     ( $attrs->{group_by}
2316         &&
2317       @{$attrs->{group_by}}
2318         &&
2319       $attrs->{_prefetch_selector_range}
2320     )
2321   ) {
2322     ($ident, $select, $where, $attrs)
2323       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2324   }
2325   elsif (! $attrs->{software_limit} ) {
2326     push @limit, (
2327       $attrs->{rows} || (),
2328       $attrs->{offset} || (),
2329     );
2330   }
2331
2332   # try to simplify the joinmap further (prune unreferenced type-single joins)
2333   if (
2334     ref $ident
2335       and
2336     reftype $ident eq 'ARRAY'
2337       and
2338     @$ident != 1
2339   ) {
2340     $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2341   }
2342
2343 ###
2344   # This would be the point to deflate anything found in $where
2345   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2346   # expect a row object. And all we have is a resultsource (it is trivial
2347   # to extract deflator coderefs via $alias2source above).
2348   #
2349   # I don't see a way forward other than changing the way deflators are
2350   # invoked, and that's just bad...
2351 ###
2352
2353   return ('select', $ident, $select, $where, $attrs, @limit);
2354 }
2355
2356 # Returns a counting SELECT for a simple count
2357 # query. Abstracted so that a storage could override
2358 # this to { count => 'firstcol' } or whatever makes
2359 # sense as a performance optimization
2360 sub _count_select {
2361   #my ($self, $source, $rs_attrs) = @_;
2362   return { count => '*' };
2363 }
2364
2365 sub source_bind_attributes {
2366   shift->throw_exception(
2367     'source_bind_attributes() was never meant to be a callable public method - '
2368    .'please contact the DBIC dev-team and describe your use case so that a reasonable '
2369    .'solution can be provided'
2370    ."\nhttp://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT"
2371   );
2372 }
2373
2374 =head2 select
2375
2376 =over 4
2377
2378 =item Arguments: $ident, $select, $condition, $attrs
2379
2380 =back
2381
2382 Handle a SQL select statement.
2383
2384 =cut
2385
2386 sub select {
2387   my $self = shift;
2388   my ($ident, $select, $condition, $attrs) = @_;
2389   return $self->cursor_class->new($self, \@_, $attrs);
2390 }
2391
2392 sub select_single {
2393   my $self = shift;
2394   my ($rv, $sth, @bind) = $self->_select(@_);
2395   my @row = $sth->fetchrow_array;
2396   my @nextrow = $sth->fetchrow_array if @row;
2397   if(@row && @nextrow) {
2398     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2399   }
2400   # Need to call finish() to work round broken DBDs
2401   $sth->finish();
2402   return @row;
2403 }
2404
2405 =head2 sql_limit_dialect
2406
2407 This is an accessor for the default SQL limit dialect used by a particular
2408 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2409 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2410 see L<DBIx::Class::SQLMaker::LimitDialects>.
2411
2412 =cut
2413
2414 sub _dbh_sth {
2415   my ($self, $dbh, $sql) = @_;
2416
2417   # 3 is the if_active parameter which avoids active sth re-use
2418   my $sth = $self->disable_sth_caching
2419     ? $dbh->prepare($sql)
2420     : $dbh->prepare_cached($sql, {}, 3);
2421
2422   # XXX You would think RaiseError would make this impossible,
2423   #  but apparently that's not true :(
2424   $self->throw_exception(
2425     $dbh->errstr
2426       ||
2427     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2428             .'an exception and/or setting $dbh->errstr',
2429       length ($sql) > 20
2430         ? substr($sql, 0, 20) . '...'
2431         : $sql
2432       ,
2433       'DBD::' . $dbh->{Driver}{Name},
2434     )
2435   ) if !$sth;
2436
2437   $sth;
2438 }
2439
2440 sub sth {
2441   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2442   shift->_sth(@_);
2443 }
2444
2445 sub _sth {
2446   my ($self, $sql) = @_;
2447   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2448 }
2449
2450 sub _dbh_columns_info_for {
2451   my ($self, $dbh, $table) = @_;
2452
2453   if ($dbh->can('column_info')) {
2454     my %result;
2455     my $caught;
2456     try {
2457       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2458       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2459       $sth->execute();
2460       while ( my $info = $sth->fetchrow_hashref() ){
2461         my %column_info;
2462         $column_info{data_type}   = $info->{TYPE_NAME};
2463         $column_info{size}      = $info->{COLUMN_SIZE};
2464         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2465         $column_info{default_value} = $info->{COLUMN_DEF};
2466         my $col_name = $info->{COLUMN_NAME};
2467         $col_name =~ s/^\"(.*)\"$/$1/;
2468
2469         $result{$col_name} = \%column_info;
2470       }
2471     } catch {
2472       $caught = 1;
2473     };
2474     return \%result if !$caught && scalar keys %result;
2475   }
2476
2477   my %result;
2478   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2479   $sth->execute;
2480   my @columns = @{$sth->{NAME_lc}};
2481   for my $i ( 0 .. $#columns ){
2482     my %column_info;
2483     $column_info{data_type} = $sth->{TYPE}->[$i];
2484     $column_info{size} = $sth->{PRECISION}->[$i];
2485     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2486
2487     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2488       $column_info{data_type} = $1;
2489       $column_info{size}    = $2;
2490     }
2491
2492     $result{$columns[$i]} = \%column_info;
2493   }
2494   $sth->finish;
2495
2496   foreach my $col (keys %result) {
2497     my $colinfo = $result{$col};
2498     my $type_num = $colinfo->{data_type};
2499     my $type_name;
2500     if(defined $type_num && $dbh->can('type_info')) {
2501       my $type_info = $dbh->type_info($type_num);
2502       $type_name = $type_info->{TYPE_NAME} if $type_info;
2503       $colinfo->{data_type} = $type_name if $type_name;
2504     }
2505   }
2506
2507   return \%result;
2508 }
2509
2510 sub columns_info_for {
2511   my ($self, $table) = @_;
2512   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2513 }
2514
2515 =head2 last_insert_id
2516
2517 Return the row id of the last insert.
2518
2519 =cut
2520
2521 sub _dbh_last_insert_id {
2522     my ($self, $dbh, $source, $col) = @_;
2523
2524     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2525
2526     return $id if defined $id;
2527
2528     my $class = ref $self;
2529     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2530 }
2531
2532 sub last_insert_id {
2533   my $self = shift;
2534   $self->_dbh_last_insert_id ($self->_dbh, @_);
2535 }
2536
2537 =head2 _native_data_type
2538
2539 =over 4
2540
2541 =item Arguments: $type_name
2542
2543 =back
2544
2545 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2546 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2547 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2548
2549 The default implementation returns C<undef>, implement in your Storage driver if
2550 you need this functionality.
2551
2552 Should map types from other databases to the native RDBMS type, for example
2553 C<VARCHAR2> to C<VARCHAR>.
2554
2555 Types with modifiers should map to the underlying data type. For example,
2556 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2557
2558 Composite types should map to the container type, for example
2559 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2560
2561 =cut
2562
2563 sub _native_data_type {
2564   #my ($self, $data_type) = @_;
2565   return undef
2566 }
2567
2568 # Check if placeholders are supported at all
2569 sub _determine_supports_placeholders {
2570   my $self = shift;
2571   my $dbh  = $self->_get_dbh;
2572
2573   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2574   # but it is inaccurate more often than not
2575   return try {
2576     local $dbh->{PrintError} = 0;
2577     local $dbh->{RaiseError} = 1;
2578     $dbh->do('select ?', {}, 1);
2579     1;
2580   }
2581   catch {
2582     0;
2583   };
2584 }
2585
2586 # Check if placeholders bound to non-string types throw exceptions
2587 #
2588 sub _determine_supports_typeless_placeholders {
2589   my $self = shift;
2590   my $dbh  = $self->_get_dbh;
2591
2592   return try {
2593     local $dbh->{PrintError} = 0;
2594     local $dbh->{RaiseError} = 1;
2595     # this specifically tests a bind that is NOT a string
2596     $dbh->do('select 1 where 1 = ?', {}, 1);
2597     1;
2598   }
2599   catch {
2600     0;
2601   };
2602 }
2603
2604 =head2 sqlt_type
2605
2606 Returns the database driver name.
2607
2608 =cut
2609
2610 sub sqlt_type {
2611   shift->_get_dbh->{Driver}->{Name};
2612 }
2613
2614 =head2 bind_attribute_by_data_type
2615
2616 Given a datatype from column info, returns a database specific bind
2617 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2618 let the database planner just handle it.
2619
2620 Generally only needed for special case column types, like bytea in postgres.
2621
2622 =cut
2623
2624 sub bind_attribute_by_data_type {
2625     return;
2626 }
2627
2628 =head2 is_datatype_numeric
2629
2630 Given a datatype from column_info, returns a boolean value indicating if
2631 the current RDBMS considers it a numeric value. This controls how
2632 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2633 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2634 be performed instead of the usual C<eq>.
2635
2636 =cut
2637
2638 sub is_datatype_numeric {
2639   #my ($self, $dt) = @_;
2640
2641   return 0 unless $_[1];
2642
2643   $_[1] =~ /^ (?:
2644     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2645   ) $/ix;
2646 }
2647
2648
2649 =head2 create_ddl_dir
2650
2651 =over 4
2652
2653 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2654
2655 =back
2656
2657 Creates a SQL file based on the Schema, for each of the specified
2658 database engines in C<\@databases> in the given directory.
2659 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2660
2661 Given a previous version number, this will also create a file containing
2662 the ALTER TABLE statements to transform the previous schema into the
2663 current one. Note that these statements may contain C<DROP TABLE> or
2664 C<DROP COLUMN> statements that can potentially destroy data.
2665
2666 The file names are created using the C<ddl_filename> method below, please
2667 override this method in your schema if you would like a different file
2668 name format. For the ALTER file, the same format is used, replacing
2669 $version in the name with "$preversion-$version".
2670
2671 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2672 The most common value for this would be C<< { add_drop_table => 1 } >>
2673 to have the SQL produced include a C<DROP TABLE> statement for each table
2674 created. For quoting purposes supply C<quote_table_names> and
2675 C<quote_field_names>.
2676
2677 If no arguments are passed, then the following default values are assumed:
2678
2679 =over 4
2680
2681 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2682
2683 =item version    - $schema->schema_version
2684
2685 =item directory  - './'
2686
2687 =item preversion - <none>
2688
2689 =back
2690
2691 By default, C<\%sqlt_args> will have
2692
2693  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2694
2695 merged with the hash passed in. To disable any of those features, pass in a
2696 hashref like the following
2697
2698  { ignore_constraint_names => 0, # ... other options }
2699
2700
2701 WARNING: You are strongly advised to check all SQL files created, before applying
2702 them.
2703
2704 =cut
2705
2706 sub create_ddl_dir {
2707   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2708
2709   unless ($dir) {
2710     carp "No directory given, using ./\n";
2711     $dir = './';
2712   } else {
2713       -d $dir
2714         or
2715       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2716         or
2717       $self->throw_exception(
2718         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2719       );
2720   }
2721
2722   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2723
2724   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2725   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2726
2727   my $schema_version = $schema->schema_version || '1.x';
2728   $version ||= $schema_version;
2729
2730   $sqltargs = {
2731     add_drop_table => 1,
2732     ignore_constraint_names => 1,
2733     ignore_index_names => 1,
2734     %{$sqltargs || {}}
2735   };
2736
2737   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2738     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2739   }
2740
2741   my $sqlt = SQL::Translator->new( $sqltargs );
2742
2743   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2744   my $sqlt_schema = $sqlt->translate({ data => $schema })
2745     or $self->throw_exception ($sqlt->error);
2746
2747   foreach my $db (@$databases) {
2748     $sqlt->reset();
2749     $sqlt->{schema} = $sqlt_schema;
2750     $sqlt->producer($db);
2751
2752     my $file;
2753     my $filename = $schema->ddl_filename($db, $version, $dir);
2754     if (-e $filename && ($version eq $schema_version )) {
2755       # if we are dumping the current version, overwrite the DDL
2756       carp "Overwriting existing DDL file - $filename";
2757       unlink($filename);
2758     }
2759
2760     my $output = $sqlt->translate;
2761     if(!$output) {
2762       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2763       next;
2764     }
2765     if(!open($file, ">$filename")) {
2766       $self->throw_exception("Can't open $filename for writing ($!)");
2767       next;
2768     }
2769     print $file $output;
2770     close($file);
2771
2772     next unless ($preversion);
2773
2774     require SQL::Translator::Diff;
2775
2776     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2777     if(!-e $prefilename) {
2778       carp("No previous schema file found ($prefilename)");
2779       next;
2780     }
2781
2782     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2783     if(-e $difffile) {
2784       carp("Overwriting existing diff file - $difffile");
2785       unlink($difffile);
2786     }
2787
2788     my $source_schema;
2789     {
2790       my $t = SQL::Translator->new($sqltargs);
2791       $t->debug( 0 );
2792       $t->trace( 0 );
2793
2794       $t->parser( $db )
2795         or $self->throw_exception ($t->error);
2796
2797       my $out = $t->translate( $prefilename )
2798         or $self->throw_exception ($t->error);
2799
2800       $source_schema = $t->schema;
2801
2802       $source_schema->name( $prefilename )
2803         unless ( $source_schema->name );
2804     }
2805
2806     # The "new" style of producers have sane normalization and can support
2807     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2808     # And we have to diff parsed SQL against parsed SQL.
2809     my $dest_schema = $sqlt_schema;
2810
2811     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2812       my $t = SQL::Translator->new($sqltargs);
2813       $t->debug( 0 );
2814       $t->trace( 0 );
2815
2816       $t->parser( $db )
2817         or $self->throw_exception ($t->error);
2818
2819       my $out = $t->translate( $filename )
2820         or $self->throw_exception ($t->error);
2821
2822       $dest_schema = $t->schema;
2823
2824       $dest_schema->name( $filename )
2825         unless $dest_schema->name;
2826     }
2827
2828     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2829                                                   $dest_schema,   $db,
2830                                                   $sqltargs
2831                                                  );
2832     if(!open $file, ">$difffile") {
2833       $self->throw_exception("Can't write to $difffile ($!)");
2834       next;
2835     }
2836     print $file $diff;
2837     close($file);
2838   }
2839 }
2840
2841 =head2 deployment_statements
2842
2843 =over 4
2844
2845 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2846
2847 =back
2848
2849 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2850
2851 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2852 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2853
2854 C<$directory> is used to return statements from files in a previously created
2855 L</create_ddl_dir> directory and is optional. The filenames are constructed
2856 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2857
2858 If no C<$directory> is specified then the statements are constructed on the
2859 fly using L<SQL::Translator> and C<$version> is ignored.
2860
2861 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2862
2863 =cut
2864
2865 sub deployment_statements {
2866   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2867   $type ||= $self->sqlt_type;
2868   $version ||= $schema->schema_version || '1.x';
2869   $dir ||= './';
2870   my $filename = $schema->ddl_filename($type, $version, $dir);
2871   if(-f $filename)
2872   {
2873       # FIXME replace this block when a proper sane sql parser is available
2874       my $file;
2875       open($file, "<$filename")
2876         or $self->throw_exception("Can't open $filename ($!)");
2877       my @rows = <$file>;
2878       close($file);
2879       return join('', @rows);
2880   }
2881
2882   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2883     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2884   }
2885
2886   # sources needs to be a parser arg, but for simplicty allow at top level
2887   # coming in
2888   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2889       if exists $sqltargs->{sources};
2890
2891   my $tr = SQL::Translator->new(
2892     producer => "SQL::Translator::Producer::${type}",
2893     %$sqltargs,
2894     parser => 'SQL::Translator::Parser::DBIx::Class',
2895     data => $schema,
2896   );
2897
2898   return preserve_context {
2899     $tr->translate
2900   } after => sub {
2901     $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2902       unless defined $_[0];
2903   };
2904 }
2905
2906 # FIXME deploy() currently does not accurately report sql errors
2907 # Will always return true while errors are warned
2908 sub deploy {
2909   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2910   my $deploy = sub {
2911     my $line = shift;
2912     return if(!$line);
2913     return if($line =~ /^--/);
2914     # next if($line =~ /^DROP/m);
2915     return if($line =~ /^BEGIN TRANSACTION/m);
2916     return if($line =~ /^COMMIT/m);
2917     return if $line =~ /^\s+$/; # skip whitespace only
2918     $self->_query_start($line);
2919     try {
2920       # do a dbh_do cycle here, as we need some error checking in
2921       # place (even though we will ignore errors)
2922       $self->dbh_do (sub { $_[1]->do($line) });
2923     } catch {
2924       carp qq{$_ (running "${line}")};
2925     };
2926     $self->_query_end($line);
2927   };
2928   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2929   if (@statements > 1) {
2930     foreach my $statement (@statements) {
2931       $deploy->( $statement );
2932     }
2933   }
2934   elsif (@statements == 1) {
2935     # split on single line comments and end of statements
2936     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2937       $deploy->( $line );
2938     }
2939   }
2940 }
2941
2942 =head2 datetime_parser
2943
2944 Returns the datetime parser class
2945
2946 =cut
2947
2948 sub datetime_parser {
2949   my $self = shift;
2950   return $self->{datetime_parser} ||= do {
2951     $self->build_datetime_parser(@_);
2952   };
2953 }
2954
2955 =head2 datetime_parser_type
2956
2957 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2958
2959 =head2 build_datetime_parser
2960
2961 See L</datetime_parser>
2962
2963 =cut
2964
2965 sub build_datetime_parser {
2966   my $self = shift;
2967   my $type = $self->datetime_parser_type(@_);
2968   return $type;
2969 }
2970
2971
2972 =head2 is_replicating
2973
2974 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2975 replicate from a master database.  Default is undef, which is the result
2976 returned by databases that don't support replication.
2977
2978 =cut
2979
2980 sub is_replicating {
2981     return;
2982
2983 }
2984
2985 =head2 lag_behind_master
2986
2987 Returns a number that represents a certain amount of lag behind a master db
2988 when a given storage is replicating.  The number is database dependent, but
2989 starts at zero and increases with the amount of lag. Default in undef
2990
2991 =cut
2992
2993 sub lag_behind_master {
2994     return;
2995 }
2996
2997 =head2 relname_to_table_alias
2998
2999 =over 4
3000
3001 =item Arguments: $relname, $join_count
3002
3003 =back
3004
3005 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
3006 queries.
3007
3008 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
3009 way these aliases are named.
3010
3011 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
3012 otherwise C<"$relname">.
3013
3014 =cut
3015
3016 sub relname_to_table_alias {
3017   my ($self, $relname, $join_count) = @_;
3018
3019   my $alias = ($join_count && $join_count > 1 ?
3020     join('_', $relname, $join_count) : $relname);
3021
3022   return $alias;
3023 }
3024
3025 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
3026 # version and it may be necessary to amend or override it for a specific storage
3027 # if such binds are necessary.
3028 sub _max_column_bytesize {
3029   my ($self, $attr) = @_;
3030
3031   my $max_size;
3032
3033   if ($attr->{sqlt_datatype}) {
3034     my $data_type = lc($attr->{sqlt_datatype});
3035
3036     if ($attr->{sqlt_size}) {
3037
3038       # String/sized-binary types
3039       if ($data_type =~ /^(?:
3040           l? (?:var)? char(?:acter)? (?:\s*varying)?
3041             |
3042           (?:var)? binary (?:\s*varying)?
3043             |
3044           raw
3045         )\b/x
3046       ) {
3047         $max_size = $attr->{sqlt_size};
3048       }
3049       # Other charset/unicode types, assume scale of 4
3050       elsif ($data_type =~ /^(?:
3051           national \s* character (?:\s*varying)?
3052             |
3053           nchar
3054             |
3055           univarchar
3056             |
3057           nvarchar
3058         )\b/x
3059       ) {
3060         $max_size = $attr->{sqlt_size} * 4;
3061       }
3062     }
3063
3064     if (!$max_size and !$self->_is_lob_type($data_type)) {
3065       $max_size = 100 # for all other (numeric?) datatypes
3066     }
3067   }
3068
3069   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
3070 }
3071
3072 # Determine if a data_type is some type of BLOB
3073 sub _is_lob_type {
3074   my ($self, $data_type) = @_;
3075   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
3076     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
3077                                   |varchar|character\s*varying|nvarchar
3078                                   |national\s*character\s*varying))?\z/xi);
3079 }
3080
3081 sub _is_binary_lob_type {
3082   my ($self, $data_type) = @_;
3083   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
3084     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
3085 }
3086
3087 sub _is_text_lob_type {
3088   my ($self, $data_type) = @_;
3089   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
3090     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
3091                         |national\s*character\s*varying))\z/xi);
3092 }
3093
3094 # Determine if a data_type is some type of a binary type
3095 sub _is_binary_type {
3096   my ($self, $data_type) = @_;
3097   $data_type && ($self->_is_binary_lob_type($data_type)
3098     || $data_type =~ /(?:var)?(?:binary|bit|graphic)(?:\s*varying)?/i);
3099 }
3100
3101 1;
3102
3103 =head1 USAGE NOTES
3104
3105 =head2 DBIx::Class and AutoCommit
3106
3107 DBIx::Class can do some wonderful magic with handling exceptions,
3108 disconnections, and transactions when you use C<< AutoCommit => 1 >>
3109 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
3110 transaction support.
3111
3112 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
3113 in an assumed transaction between commits, and you're telling us you'd
3114 like to manage that manually.  A lot of the magic protections offered by
3115 this module will go away.  We can't protect you from exceptions due to database
3116 disconnects because we don't know anything about how to restart your
3117 transactions.  You're on your own for handling all sorts of exceptional
3118 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
3119 be with raw DBI.
3120
3121
3122 =head1 AUTHORS
3123
3124 Matt S. Trout <mst@shadowcatsystems.co.uk>
3125
3126 Andy Grundman <andy@hybridized.org>
3127
3128 =head1 LICENSE
3129
3130 You may distribute this code under the same terms as Perl itself.
3131
3132 =cut