Replace all use of wantarray tri-calls with explicit return or preserve_context
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Context::Preserve 'preserve_context';
16 use Try::Tiny;
17 use overload ();
18 use Data::Compare (); # no imports!!! guard against insane architecture
19 use namespace::clean;
20
21 # default cursor class, overridable in connect_info attributes
22 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
23
24 __PACKAGE__->mk_group_accessors('inherited' => qw/
25   sql_limit_dialect sql_quote_char sql_name_sep
26 /);
27
28 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
29
30 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
31 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
32
33 __PACKAGE__->sql_name_sep('.');
34
35 __PACKAGE__->mk_group_accessors('simple' => qw/
36   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
37   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
38   _perform_autoinc_retrieval _autoinc_supplied_for_op
39 /);
40
41 # the values for these accessors are picked out (and deleted) from
42 # the attribute hashref passed to connect_info
43 my @storage_options = qw/
44   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
45   disable_sth_caching unsafe auto_savepoint
46 /;
47 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
48
49
50 # capability definitions, using a 2-tiered accessor system
51 # The rationale is:
52 #
53 # A driver/user may define _use_X, which blindly without any checks says:
54 # "(do not) use this capability", (use_dbms_capability is an "inherited"
55 # type accessor)
56 #
57 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
58 # accessor, which in turn calls _determine_supports_X, and stores the return
59 # in a special slot on the storage object, which is wiped every time a $dbh
60 # reconnection takes place (it is not guaranteed that upon reconnection we
61 # will get the same rdbms version). _determine_supports_X does not need to
62 # exist on a driver, as we ->can for it before calling.
63
64 my @capabilities = (qw/
65   insert_returning
66   insert_returning_bound
67
68   multicolumn_in
69
70   placeholders
71   typeless_placeholders
72
73   join_optimizer
74 /);
75 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
76 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
77
78 # on by default, not strictly a capability (pending rewrite)
79 __PACKAGE__->_use_join_optimizer (1);
80 sub _determine_supports_join_optimizer { 1 };
81
82 # Each of these methods need _determine_driver called before itself
83 # in order to function reliably. This is a purely DRY optimization
84 #
85 # get_(use)_dbms_capability need to be called on the correct Storage
86 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
87 # _determine_supports_X which obv. needs a correct driver as well
88 my @rdbms_specific_methods = qw/
89   deployment_statements
90   sqlt_type
91   sql_maker
92   build_datetime_parser
93   datetime_parser_type
94
95   txn_begin
96   insert
97   insert_bulk
98   update
99   delete
100   select
101   select_single
102   with_deferred_fk_checks
103
104   get_use_dbms_capability
105   get_dbms_capability
106
107   _server_info
108   _get_server_version
109 /;
110
111 for my $meth (@rdbms_specific_methods) {
112
113   my $orig = __PACKAGE__->can ($meth)
114     or die "$meth is not a ::Storage::DBI method!";
115
116   no strict qw/refs/;
117   no warnings qw/redefine/;
118   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
119     if (
120       # only fire when invoked on an instance, a valid class-based invocation
121       # would e.g. be setting a default for an inherited accessor
122       ref $_[0]
123         and
124       ! $_[0]->_driver_determined
125         and
126       ! $_[0]->{_in_determine_driver}
127     ) {
128       $_[0]->_determine_driver;
129
130       # This for some reason crashes and burns on perl 5.8.1
131       # IFF the method ends up throwing an exception
132       #goto $_[0]->can ($meth);
133
134       my $cref = $_[0]->can ($meth);
135       goto $cref;
136     }
137
138     goto $orig;
139   };
140 }
141
142 =head1 NAME
143
144 DBIx::Class::Storage::DBI - DBI storage handler
145
146 =head1 SYNOPSIS
147
148   my $schema = MySchema->connect('dbi:SQLite:my.db');
149
150   $schema->storage->debug(1);
151
152   my @stuff = $schema->storage->dbh_do(
153     sub {
154       my ($storage, $dbh, @args) = @_;
155       $dbh->do("DROP TABLE authors");
156     },
157     @column_list
158   );
159
160   $schema->resultset('Book')->search({
161      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
162   });
163
164 =head1 DESCRIPTION
165
166 This class represents the connection to an RDBMS via L<DBI>.  See
167 L<DBIx::Class::Storage> for general information.  This pod only
168 documents DBI-specific methods and behaviors.
169
170 =head1 METHODS
171
172 =cut
173
174 sub new {
175   my $new = shift->next::method(@_);
176
177   $new->_sql_maker_opts({});
178   $new->_dbh_details({});
179   $new->{_in_do_block} = 0;
180   $new->{_dbh_gen} = 0;
181
182   # read below to see what this does
183   $new->_arm_global_destructor;
184
185   $new;
186 }
187
188 # This is hack to work around perl shooting stuff in random
189 # order on exit(). If we do not walk the remaining storage
190 # objects in an END block, there is a *small but real* chance
191 # of a fork()ed child to kill the parent's shared DBI handle,
192 # *before perl reaches the DESTROY in this package*
193 # Yes, it is ugly and effective.
194 # Additionally this registry is used by the CLONE method to
195 # make sure no handles are shared between threads
196 {
197   my %seek_and_destroy;
198
199   sub _arm_global_destructor {
200     my $self = shift;
201     my $key = refaddr ($self);
202     $seek_and_destroy{$key} = $self;
203     weaken ($seek_and_destroy{$key});
204   }
205
206   END {
207     local $?; # just in case the DBI destructor changes it somehow
208
209     # destroy just the object if not native to this process/thread
210     $_->_verify_pid for (grep
211       { defined $_ }
212       values %seek_and_destroy
213     );
214   }
215
216   sub CLONE {
217     # As per DBI's recommendation, DBIC disconnects all handles as
218     # soon as possible (DBIC will reconnect only on demand from within
219     # the thread)
220     for (values %seek_and_destroy) {
221       next unless $_;
222       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
223       $_->_dbh(undef);
224
225       $_->transaction_depth(0);
226       $_->savepoints([]);
227     }
228   }
229 }
230
231 sub DESTROY {
232   my $self = shift;
233
234   # some databases spew warnings on implicit disconnect
235   $self->_verify_pid;
236   local $SIG{__WARN__} = sub {};
237   $self->_dbh(undef);
238
239   # this op is necessary, since the very last perl runtime statement
240   # triggers a global destruction shootout, and the $SIG localization
241   # may very well be destroyed before perl actually gets to do the
242   # $dbh undef
243   1;
244 }
245
246 # handle pid changes correctly - do not destroy parent's connection
247 sub _verify_pid {
248   my $self = shift;
249
250   my $pid = $self->_conn_pid;
251   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
252     $dbh->{InactiveDestroy} = 1;
253     $self->{_dbh_gen}++;
254     $self->_dbh(undef);
255     $self->transaction_depth(0);
256     $self->savepoints([]);
257   }
258
259   return;
260 }
261
262 =head2 connect_info
263
264 This method is normally called by L<DBIx::Class::Schema/connection>, which
265 encapsulates its argument list in an arrayref before passing them here.
266
267 The argument list may contain:
268
269 =over
270
271 =item *
272
273 The same 4-element argument set one would normally pass to
274 L<DBI/connect>, optionally followed by
275 L<extra attributes|/DBIx::Class specific connection attributes>
276 recognized by DBIx::Class:
277
278   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
279
280 =item *
281
282 A single code reference which returns a connected
283 L<DBI database handle|DBI/connect> optionally followed by
284 L<extra attributes|/DBIx::Class specific connection attributes> recognized
285 by DBIx::Class:
286
287   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
288
289 =item *
290
291 A single hashref with all the attributes and the dsn/user/password
292 mixed together:
293
294   $connect_info_args = [{
295     dsn => $dsn,
296     user => $user,
297     password => $pass,
298     %dbi_attributes,
299     %extra_attributes,
300   }];
301
302   $connect_info_args = [{
303     dbh_maker => sub { DBI->connect (...) },
304     %dbi_attributes,
305     %extra_attributes,
306   }];
307
308 This is particularly useful for L<Catalyst> based applications, allowing the
309 following config (L<Config::General> style):
310
311   <Model::DB>
312     schema_class   App::DB
313     <connect_info>
314       dsn          dbi:mysql:database=test
315       user         testuser
316       password     TestPass
317       AutoCommit   1
318     </connect_info>
319   </Model::DB>
320
321 The C<dsn>/C<user>/C<password> combination can be substituted by the
322 C<dbh_maker> key whose value is a coderef that returns a connected
323 L<DBI database handle|DBI/connect>
324
325 =back
326
327 Please note that the L<DBI> docs recommend that you always explicitly
328 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
329 recommends that it be set to I<1>, and that you perform transactions
330 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
331 to I<1> if you do not do explicitly set it to zero.  This is the default
332 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
333
334 =head3 DBIx::Class specific connection attributes
335
336 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
337 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
338 the following connection options. These options can be mixed in with your other
339 L<DBI> connection attributes, or placed in a separate hashref
340 (C<\%extra_attributes>) as shown above.
341
342 Every time C<connect_info> is invoked, any previous settings for
343 these options will be cleared before setting the new ones, regardless of
344 whether any options are specified in the new C<connect_info>.
345
346
347 =over
348
349 =item on_connect_do
350
351 Specifies things to do immediately after connecting or re-connecting to
352 the database.  Its value may contain:
353
354 =over
355
356 =item a scalar
357
358 This contains one SQL statement to execute.
359
360 =item an array reference
361
362 This contains SQL statements to execute in order.  Each element contains
363 a string or a code reference that returns a string.
364
365 =item a code reference
366
367 This contains some code to execute.  Unlike code references within an
368 array reference, its return value is ignored.
369
370 =back
371
372 =item on_disconnect_do
373
374 Takes arguments in the same form as L</on_connect_do> and executes them
375 immediately before disconnecting from the database.
376
377 Note, this only runs if you explicitly call L</disconnect> on the
378 storage object.
379
380 =item on_connect_call
381
382 A more generalized form of L</on_connect_do> that calls the specified
383 C<connect_call_METHOD> methods in your storage driver.
384
385   on_connect_do => 'select 1'
386
387 is equivalent to:
388
389   on_connect_call => [ [ do_sql => 'select 1' ] ]
390
391 Its values may contain:
392
393 =over
394
395 =item a scalar
396
397 Will call the C<connect_call_METHOD> method.
398
399 =item a code reference
400
401 Will execute C<< $code->($storage) >>
402
403 =item an array reference
404
405 Each value can be a method name or code reference.
406
407 =item an array of arrays
408
409 For each array, the first item is taken to be the C<connect_call_> method name
410 or code reference, and the rest are parameters to it.
411
412 =back
413
414 Some predefined storage methods you may use:
415
416 =over
417
418 =item do_sql
419
420 Executes a SQL string or a code reference that returns a SQL string. This is
421 what L</on_connect_do> and L</on_disconnect_do> use.
422
423 It can take:
424
425 =over
426
427 =item a scalar
428
429 Will execute the scalar as SQL.
430
431 =item an arrayref
432
433 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
434 attributes hashref and bind values.
435
436 =item a code reference
437
438 Will execute C<< $code->($storage) >> and execute the return array refs as
439 above.
440
441 =back
442
443 =item datetime_setup
444
445 Execute any statements necessary to initialize the database session to return
446 and accept datetime/timestamp values used with
447 L<DBIx::Class::InflateColumn::DateTime>.
448
449 Only necessary for some databases, see your specific storage driver for
450 implementation details.
451
452 =back
453
454 =item on_disconnect_call
455
456 Takes arguments in the same form as L</on_connect_call> and executes them
457 immediately before disconnecting from the database.
458
459 Calls the C<disconnect_call_METHOD> methods as opposed to the
460 C<connect_call_METHOD> methods called by L</on_connect_call>.
461
462 Note, this only runs if you explicitly call L</disconnect> on the
463 storage object.
464
465 =item disable_sth_caching
466
467 If set to a true value, this option will disable the caching of
468 statement handles via L<DBI/prepare_cached>.
469
470 =item limit_dialect
471
472 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
473 default L</sql_limit_dialect> setting of the storage (if any). For a list
474 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
475
476 =item quote_names
477
478 When true automatically sets L</quote_char> and L</name_sep> to the characters
479 appropriate for your particular RDBMS. This option is preferred over specifying
480 L</quote_char> directly.
481
482 =item quote_char
483
484 Specifies what characters to use to quote table and column names.
485
486 C<quote_char> expects either a single character, in which case is it
487 is placed on either side of the table/column name, or an arrayref of length
488 2 in which case the table/column name is placed between the elements.
489
490 For example under MySQL you should use C<< quote_char => '`' >>, and for
491 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
492
493 =item name_sep
494
495 This parameter is only useful in conjunction with C<quote_char>, and is used to
496 specify the character that separates elements (schemas, tables, columns) from
497 each other. If unspecified it defaults to the most commonly used C<.>.
498
499 =item unsafe
500
501 This Storage driver normally installs its own C<HandleError>, sets
502 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
503 all database handles, including those supplied by a coderef.  It does this
504 so that it can have consistent and useful error behavior.
505
506 If you set this option to a true value, Storage will not do its usual
507 modifications to the database handle's attributes, and instead relies on
508 the settings in your connect_info DBI options (or the values you set in
509 your connection coderef, in the case that you are connecting via coderef).
510
511 Note that your custom settings can cause Storage to malfunction,
512 especially if you set a C<HandleError> handler that suppresses exceptions
513 and/or disable C<RaiseError>.
514
515 =item auto_savepoint
516
517 If this option is true, L<DBIx::Class> will use savepoints when nesting
518 transactions, making it possible to recover from failure in the inner
519 transaction without having to abort all outer transactions.
520
521 =item cursor_class
522
523 Use this argument to supply a cursor class other than the default
524 L<DBIx::Class::Storage::DBI::Cursor>.
525
526 =back
527
528 Some real-life examples of arguments to L</connect_info> and
529 L<DBIx::Class::Schema/connect>
530
531   # Simple SQLite connection
532   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
533
534   # Connect via subref
535   ->connect_info([ sub { DBI->connect(...) } ]);
536
537   # Connect via subref in hashref
538   ->connect_info([{
539     dbh_maker => sub { DBI->connect(...) },
540     on_connect_do => 'alter session ...',
541   }]);
542
543   # A bit more complicated
544   ->connect_info(
545     [
546       'dbi:Pg:dbname=foo',
547       'postgres',
548       'my_pg_password',
549       { AutoCommit => 1 },
550       { quote_char => q{"} },
551     ]
552   );
553
554   # Equivalent to the previous example
555   ->connect_info(
556     [
557       'dbi:Pg:dbname=foo',
558       'postgres',
559       'my_pg_password',
560       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
561     ]
562   );
563
564   # Same, but with hashref as argument
565   # See parse_connect_info for explanation
566   ->connect_info(
567     [{
568       dsn         => 'dbi:Pg:dbname=foo',
569       user        => 'postgres',
570       password    => 'my_pg_password',
571       AutoCommit  => 1,
572       quote_char  => q{"},
573       name_sep    => q{.},
574     }]
575   );
576
577   # Subref + DBIx::Class-specific connection options
578   ->connect_info(
579     [
580       sub { DBI->connect(...) },
581       {
582           quote_char => q{`},
583           name_sep => q{@},
584           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
585           disable_sth_caching => 1,
586       },
587     ]
588   );
589
590
591
592 =cut
593
594 sub connect_info {
595   my ($self, $info) = @_;
596
597   return $self->_connect_info if !$info;
598
599   $self->_connect_info($info); # copy for _connect_info
600
601   $info = $self->_normalize_connect_info($info)
602     if ref $info eq 'ARRAY';
603
604   for my $storage_opt (keys %{ $info->{storage_options} }) {
605     my $value = $info->{storage_options}{$storage_opt};
606
607     $self->$storage_opt($value);
608   }
609
610   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
611   #  the new set of options
612   $self->_sql_maker(undef);
613   $self->_sql_maker_opts({});
614
615   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
616     my $value = $info->{sql_maker_options}{$sql_maker_opt};
617
618     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
619   }
620
621   my %attrs = (
622     %{ $self->_default_dbi_connect_attributes || {} },
623     %{ $info->{attributes} || {} },
624   );
625
626   my @args = @{ $info->{arguments} };
627
628   if (keys %attrs and ref $args[0] ne 'CODE') {
629     carp
630         'You provided explicit AutoCommit => 0 in your connection_info. '
631       . 'This is almost universally a bad idea (see the footnotes of '
632       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
633       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
634       . 'this warning.'
635       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
636
637     push @args, \%attrs if keys %attrs;
638   }
639   $self->_dbi_connect_info(\@args);
640
641   # FIXME - dirty:
642   # save attributes them in a separate accessor so they are always
643   # introspectable, even in case of a CODE $dbhmaker
644   $self->_dbic_connect_attributes (\%attrs);
645
646   return $self->_connect_info;
647 }
648
649 sub _normalize_connect_info {
650   my ($self, $info_arg) = @_;
651   my %info;
652
653   my @args = @$info_arg;  # take a shallow copy for further mutilation
654
655   # combine/pre-parse arguments depending on invocation style
656
657   my %attrs;
658   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
659     %attrs = %{ $args[1] || {} };
660     @args = $args[0];
661   }
662   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
663     %attrs = %{$args[0]};
664     @args = ();
665     if (my $code = delete $attrs{dbh_maker}) {
666       @args = $code;
667
668       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
669       if (@ignored) {
670         carp sprintf (
671             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
672           . "to the result of 'dbh_maker'",
673
674           join (', ', map { "'$_'" } (@ignored) ),
675         );
676       }
677     }
678     else {
679       @args = delete @attrs{qw/dsn user password/};
680     }
681   }
682   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
683     %attrs = (
684       % { $args[3] || {} },
685       % { $args[4] || {} },
686     );
687     @args = @args[0,1,2];
688   }
689
690   $info{arguments} = \@args;
691
692   my @storage_opts = grep exists $attrs{$_},
693     @storage_options, 'cursor_class';
694
695   @{ $info{storage_options} }{@storage_opts} =
696     delete @attrs{@storage_opts} if @storage_opts;
697
698   my @sql_maker_opts = grep exists $attrs{$_},
699     qw/limit_dialect quote_char name_sep quote_names/;
700
701   @{ $info{sql_maker_options} }{@sql_maker_opts} =
702     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
703
704   $info{attributes} = \%attrs if %attrs;
705
706   return \%info;
707 }
708
709 sub _default_dbi_connect_attributes () {
710   +{
711     AutoCommit => 1,
712     PrintError => 0,
713     RaiseError => 1,
714     ShowErrorStatement => 1,
715   };
716 }
717
718 =head2 on_connect_do
719
720 This method is deprecated in favour of setting via L</connect_info>.
721
722 =cut
723
724 =head2 on_disconnect_do
725
726 This method is deprecated in favour of setting via L</connect_info>.
727
728 =cut
729
730 sub _parse_connect_do {
731   my ($self, $type) = @_;
732
733   my $val = $self->$type;
734   return () if not defined $val;
735
736   my @res;
737
738   if (not ref($val)) {
739     push @res, [ 'do_sql', $val ];
740   } elsif (ref($val) eq 'CODE') {
741     push @res, $val;
742   } elsif (ref($val) eq 'ARRAY') {
743     push @res, map { [ 'do_sql', $_ ] } @$val;
744   } else {
745     $self->throw_exception("Invalid type for $type: ".ref($val));
746   }
747
748   return \@res;
749 }
750
751 =head2 dbh_do
752
753 Arguments: ($subref | $method_name), @extra_coderef_args?
754
755 Execute the given $subref or $method_name using the new exception-based
756 connection management.
757
758 The first two arguments will be the storage object that C<dbh_do> was called
759 on and a database handle to use.  Any additional arguments will be passed
760 verbatim to the called subref as arguments 2 and onwards.
761
762 Using this (instead of $self->_dbh or $self->dbh) ensures correct
763 exception handling and reconnection (or failover in future subclasses).
764
765 Your subref should have no side-effects outside of the database, as
766 there is the potential for your subref to be partially double-executed
767 if the database connection was stale/dysfunctional.
768
769 Example:
770
771   my @stuff = $schema->storage->dbh_do(
772     sub {
773       my ($storage, $dbh, @cols) = @_;
774       my $cols = join(q{, }, @cols);
775       $dbh->selectrow_array("SELECT $cols FROM foo");
776     },
777     @column_list
778   );
779
780 =cut
781
782 sub dbh_do {
783   my $self = shift;
784   my $run_target = shift;
785
786   # short circuit when we know there is no need for a runner
787   #
788   # FIXME - asumption may be wrong
789   # the rationale for the txn_depth check is that if this block is a part
790   # of a larger transaction, everything up to that point is screwed anyway
791   return $self->$run_target($self->_get_dbh, @_)
792     if $self->{_in_do_block} or $self->transaction_depth;
793
794   my $args = \@_;
795
796   DBIx::Class::Storage::BlockRunner->new(
797     storage => $self,
798     run_code => sub { $self->$run_target ($self->_get_dbh, @$args ) },
799     wrap_txn => 0,
800     retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) },
801   )->run;
802 }
803
804 sub txn_do {
805   $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
806   shift->next::method(@_);
807 }
808
809 =head2 disconnect
810
811 Our C<disconnect> method also performs a rollback first if the
812 database is not in C<AutoCommit> mode.
813
814 =cut
815
816 sub disconnect {
817   my ($self) = @_;
818
819   if( $self->_dbh ) {
820     my @actions;
821
822     push @actions, ( $self->on_disconnect_call || () );
823     push @actions, $self->_parse_connect_do ('on_disconnect_do');
824
825     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
826
827     # stops the "implicit rollback on disconnect" warning
828     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
829
830     %{ $self->_dbh->{CachedKids} } = ();
831     $self->_dbh->disconnect;
832     $self->_dbh(undef);
833     $self->{_dbh_gen}++;
834   }
835 }
836
837 =head2 with_deferred_fk_checks
838
839 =over 4
840
841 =item Arguments: C<$coderef>
842
843 =item Return Value: The return value of $coderef
844
845 =back
846
847 Storage specific method to run the code ref with FK checks deferred or
848 in MySQL's case disabled entirely.
849
850 =cut
851
852 # Storage subclasses should override this
853 sub with_deferred_fk_checks {
854   my ($self, $sub) = @_;
855   $sub->();
856 }
857
858 =head2 connected
859
860 =over
861
862 =item Arguments: none
863
864 =item Return Value: 1|0
865
866 =back
867
868 Verifies that the current database handle is active and ready to execute
869 an SQL statement (e.g. the connection did not get stale, server is still
870 answering, etc.) This method is used internally by L</dbh>.
871
872 =cut
873
874 sub connected {
875   my $self = shift;
876   return 0 unless $self->_seems_connected;
877
878   #be on the safe side
879   local $self->_dbh->{RaiseError} = 1;
880
881   return $self->_ping;
882 }
883
884 sub _seems_connected {
885   my $self = shift;
886
887   $self->_verify_pid;
888
889   my $dbh = $self->_dbh
890     or return 0;
891
892   return $dbh->FETCH('Active');
893 }
894
895 sub _ping {
896   my $self = shift;
897
898   my $dbh = $self->_dbh or return 0;
899
900   return $dbh->ping;
901 }
902
903 sub ensure_connected {
904   my ($self) = @_;
905
906   unless ($self->connected) {
907     $self->_populate_dbh;
908   }
909 }
910
911 =head2 dbh
912
913 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
914 is guaranteed to be healthy by implicitly calling L</connected>, and if
915 necessary performing a reconnection before returning. Keep in mind that this
916 is very B<expensive> on some database engines. Consider using L</dbh_do>
917 instead.
918
919 =cut
920
921 sub dbh {
922   my ($self) = @_;
923
924   if (not $self->_dbh) {
925     $self->_populate_dbh;
926   } else {
927     $self->ensure_connected;
928   }
929   return $self->_dbh;
930 }
931
932 # this is the internal "get dbh or connect (don't check)" method
933 sub _get_dbh {
934   my $self = shift;
935   $self->_verify_pid;
936   $self->_populate_dbh unless $self->_dbh;
937   return $self->_dbh;
938 }
939
940 sub sql_maker {
941   my ($self) = @_;
942   unless ($self->_sql_maker) {
943     my $sql_maker_class = $self->sql_maker_class;
944
945     my %opts = %{$self->_sql_maker_opts||{}};
946     my $dialect =
947       $opts{limit_dialect}
948         ||
949       $self->sql_limit_dialect
950         ||
951       do {
952         my $s_class = (ref $self) || $self;
953         carp (
954           "Your storage class ($s_class) does not set sql_limit_dialect and you "
955         . 'have not supplied an explicit limit_dialect in your connection_info. '
956         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
957         . 'databases but can be (and often is) painfully slow. '
958         . "Please file an RT ticket against '$s_class' ."
959         );
960
961         'GenericSubQ';
962       }
963     ;
964
965     my ($quote_char, $name_sep);
966
967     if ($opts{quote_names}) {
968       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
969         my $s_class = (ref $self) || $self;
970         carp (
971           "You requested 'quote_names' but your storage class ($s_class) does "
972         . 'not explicitly define a default sql_quote_char and you have not '
973         . 'supplied a quote_char as part of your connection_info. DBIC will '
974         .q{default to the ANSI SQL standard quote '"', which works most of }
975         . "the time. Please file an RT ticket against '$s_class'."
976         );
977
978         '"'; # RV
979       };
980
981       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
982     }
983
984     $self->_sql_maker($sql_maker_class->new(
985       bindtype=>'columns',
986       array_datatypes => 1,
987       limit_dialect => $dialect,
988       ($quote_char ? (quote_char => $quote_char) : ()),
989       name_sep => ($name_sep || '.'),
990       %opts,
991     ));
992   }
993   return $self->_sql_maker;
994 }
995
996 # nothing to do by default
997 sub _rebless {}
998 sub _init {}
999
1000 sub _populate_dbh {
1001   my ($self) = @_;
1002
1003   my @info = @{$self->_dbi_connect_info || []};
1004   $self->_dbh(undef); # in case ->connected failed we might get sent here
1005   $self->_dbh_details({}); # reset everything we know
1006
1007   $self->_dbh($self->_connect(@info));
1008
1009   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1010
1011   $self->_determine_driver;
1012
1013   # Always set the transaction depth on connect, since
1014   #  there is no transaction in progress by definition
1015   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1016
1017   $self->_run_connection_actions unless $self->{_in_determine_driver};
1018 }
1019
1020 sub _run_connection_actions {
1021   my $self = shift;
1022   my @actions;
1023
1024   push @actions, ( $self->on_connect_call || () );
1025   push @actions, $self->_parse_connect_do ('on_connect_do');
1026
1027   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1028 }
1029
1030
1031
1032 sub set_use_dbms_capability {
1033   $_[0]->set_inherited ($_[1], $_[2]);
1034 }
1035
1036 sub get_use_dbms_capability {
1037   my ($self, $capname) = @_;
1038
1039   my $use = $self->get_inherited ($capname);
1040   return defined $use
1041     ? $use
1042     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1043   ;
1044 }
1045
1046 sub set_dbms_capability {
1047   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1048 }
1049
1050 sub get_dbms_capability {
1051   my ($self, $capname) = @_;
1052
1053   my $cap = $self->_dbh_details->{capability}{$capname};
1054
1055   unless (defined $cap) {
1056     if (my $meth = $self->can ("_determine$capname")) {
1057       $cap = $self->$meth ? 1 : 0;
1058     }
1059     else {
1060       $cap = 0;
1061     }
1062
1063     $self->set_dbms_capability ($capname, $cap);
1064   }
1065
1066   return $cap;
1067 }
1068
1069 sub _server_info {
1070   my $self = shift;
1071
1072   my $info;
1073   unless ($info = $self->_dbh_details->{info}) {
1074
1075     $info = {};
1076
1077     my $server_version = try { $self->_get_server_version };
1078
1079     if (defined $server_version) {
1080       $info->{dbms_version} = $server_version;
1081
1082       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1083       my @verparts = split (/\./, $numeric_version);
1084       if (
1085         @verparts
1086           &&
1087         $verparts[0] <= 999
1088       ) {
1089         # consider only up to 3 version parts, iff not more than 3 digits
1090         my @use_parts;
1091         while (@verparts && @use_parts < 3) {
1092           my $p = shift @verparts;
1093           last if $p > 999;
1094           push @use_parts, $p;
1095         }
1096         push @use_parts, 0 while @use_parts < 3;
1097
1098         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1099       }
1100     }
1101
1102     $self->_dbh_details->{info} = $info;
1103   }
1104
1105   return $info;
1106 }
1107
1108 sub _get_server_version {
1109   shift->_dbh_get_info(18);
1110 }
1111
1112 sub _dbh_get_info {
1113   my ($self, $info) = @_;
1114
1115   return try { $self->_get_dbh->get_info($info) } || undef;
1116 }
1117
1118 sub _determine_driver {
1119   my ($self) = @_;
1120
1121   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1122     my $started_connected = 0;
1123     local $self->{_in_determine_driver} = 1;
1124
1125     if (ref($self) eq __PACKAGE__) {
1126       my $driver;
1127       if ($self->_dbh) { # we are connected
1128         $driver = $self->_dbh->{Driver}{Name};
1129         $started_connected = 1;
1130       } else {
1131         # if connect_info is a CODEREF, we have no choice but to connect
1132         if (ref $self->_dbi_connect_info->[0] &&
1133             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1134           $self->_populate_dbh;
1135           $driver = $self->_dbh->{Driver}{Name};
1136         }
1137         else {
1138           # try to use dsn to not require being connected, the driver may still
1139           # force a connection in _rebless to determine version
1140           # (dsn may not be supplied at all if all we do is make a mock-schema)
1141           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1142           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1143           $driver ||= $ENV{DBI_DRIVER};
1144         }
1145       }
1146
1147       if ($driver) {
1148         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1149         if ($self->load_optional_class($storage_class)) {
1150           mro::set_mro($storage_class, 'c3');
1151           bless $self, $storage_class;
1152           $self->_rebless();
1153         }
1154       }
1155     }
1156
1157     $self->_driver_determined(1);
1158
1159     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1160
1161     $self->_init; # run driver-specific initializations
1162
1163     $self->_run_connection_actions
1164         if !$started_connected && defined $self->_dbh;
1165   }
1166 }
1167
1168 sub _do_connection_actions {
1169   my $self          = shift;
1170   my $method_prefix = shift;
1171   my $call          = shift;
1172
1173   if (not ref($call)) {
1174     my $method = $method_prefix . $call;
1175     $self->$method(@_);
1176   } elsif (ref($call) eq 'CODE') {
1177     $self->$call(@_);
1178   } elsif (ref($call) eq 'ARRAY') {
1179     if (ref($call->[0]) ne 'ARRAY') {
1180       $self->_do_connection_actions($method_prefix, $_) for @$call;
1181     } else {
1182       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1183     }
1184   } else {
1185     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1186   }
1187
1188   return $self;
1189 }
1190
1191 sub connect_call_do_sql {
1192   my $self = shift;
1193   $self->_do_query(@_);
1194 }
1195
1196 sub disconnect_call_do_sql {
1197   my $self = shift;
1198   $self->_do_query(@_);
1199 }
1200
1201 # override in db-specific backend when necessary
1202 sub connect_call_datetime_setup { 1 }
1203
1204 sub _do_query {
1205   my ($self, $action) = @_;
1206
1207   if (ref $action eq 'CODE') {
1208     $action = $action->($self);
1209     $self->_do_query($_) foreach @$action;
1210   }
1211   else {
1212     # Most debuggers expect ($sql, @bind), so we need to exclude
1213     # the attribute hash which is the second argument to $dbh->do
1214     # furthermore the bind values are usually to be presented
1215     # as named arrayref pairs, so wrap those here too
1216     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1217     my $sql = shift @do_args;
1218     my $attrs = shift @do_args;
1219     my @bind = map { [ undef, $_ ] } @do_args;
1220
1221     $self->dbh_do(sub {
1222       $_[0]->_query_start($sql, \@bind);
1223       $_[1]->do($sql, $attrs, @do_args);
1224       $_[0]->_query_end($sql, \@bind);
1225     });
1226   }
1227
1228   return $self;
1229 }
1230
1231 sub _connect {
1232   my ($self, @info) = @_;
1233
1234   $self->throw_exception("You failed to provide any connection info")
1235     if !@info;
1236
1237   my ($old_connect_via, $dbh);
1238
1239   local $DBI::connect_via = 'connect' if $INC{'Apache/DBI.pm'} && $ENV{MOD_PERL};
1240
1241   try {
1242     if(ref $info[0] eq 'CODE') {
1243       $dbh = $info[0]->();
1244     }
1245     else {
1246       require DBI;
1247       $dbh = DBI->connect(@info);
1248     }
1249
1250     if (!$dbh) {
1251       die $DBI::errstr;
1252     }
1253
1254     unless ($self->unsafe) {
1255
1256       $self->throw_exception(
1257         'Refusing clobbering of {HandleError} installed on externally supplied '
1258        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1259       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1260
1261       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1262       # request, or an external handle. Complain and set anyway
1263       unless ($dbh->{RaiseError}) {
1264         carp( ref $info[0] eq 'CODE'
1265
1266           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1267            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1268            .'attribute has been supplied'
1269
1270           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1271            .'unsafe => 1. Toggling RaiseError back to true'
1272         );
1273
1274         $dbh->{RaiseError} = 1;
1275       }
1276
1277       # this odd anonymous coderef dereference is in fact really
1278       # necessary to avoid the unwanted effect described in perl5
1279       # RT#75792
1280       sub {
1281         my $weak_self = $_[0];
1282         weaken $weak_self;
1283
1284         # the coderef is blessed so we can distinguish it from externally
1285         # supplied handles (which must be preserved)
1286         $_[1]->{HandleError} = bless sub {
1287           if ($weak_self) {
1288             $weak_self->throw_exception("DBI Exception: $_[0]");
1289           }
1290           else {
1291             # the handler may be invoked by something totally out of
1292             # the scope of DBIC
1293             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1294           }
1295         }, '__DBIC__DBH__ERROR__HANDLER__';
1296       }->($self, $dbh);
1297     }
1298   }
1299   catch {
1300     $self->throw_exception("DBI Connection failed: $_")
1301   };
1302
1303   $self->_dbh_autocommit($dbh->{AutoCommit});
1304   $dbh;
1305 }
1306
1307 sub txn_begin {
1308   my $self = shift;
1309
1310   # this means we have not yet connected and do not know the AC status
1311   # (e.g. coderef $dbh), need a full-fledged connection check
1312   if (! defined $self->_dbh_autocommit) {
1313     $self->ensure_connected;
1314   }
1315   # Otherwise simply connect or re-connect on pid changes
1316   else {
1317     $self->_get_dbh;
1318   }
1319
1320   $self->next::method(@_);
1321 }
1322
1323 sub _exec_txn_begin {
1324   my $self = shift;
1325
1326   # if the user is utilizing txn_do - good for him, otherwise we need to
1327   # ensure that the $dbh is healthy on BEGIN.
1328   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1329   # will be replaced by a failure of begin_work itself (which will be
1330   # then retried on reconnect)
1331   if ($self->{_in_do_block}) {
1332     $self->_dbh->begin_work;
1333   } else {
1334     $self->dbh_do(sub { $_[1]->begin_work });
1335   }
1336 }
1337
1338 sub txn_commit {
1339   my $self = shift;
1340
1341   $self->_verify_pid if $self->_dbh;
1342   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1343     unless $self->_dbh;
1344
1345   # esoteric case for folks using external $dbh handles
1346   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1347     carp "Storage transaction_depth 0 does not match "
1348         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1349     $self->transaction_depth(1);
1350   }
1351
1352   $self->next::method(@_);
1353
1354   # if AutoCommit is disabled txn_depth never goes to 0
1355   # as a new txn is started immediately on commit
1356   $self->transaction_depth(1) if (
1357     !$self->transaction_depth
1358       and
1359     defined $self->_dbh_autocommit
1360       and
1361     ! $self->_dbh_autocommit
1362   );
1363 }
1364
1365 sub _exec_txn_commit {
1366   shift->_dbh->commit;
1367 }
1368
1369 sub txn_rollback {
1370   my $self = shift;
1371
1372   $self->_verify_pid if $self->_dbh;
1373   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1374     unless $self->_dbh;
1375
1376   # esoteric case for folks using external $dbh handles
1377   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1378     carp "Storage transaction_depth 0 does not match "
1379         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1380     $self->transaction_depth(1);
1381   }
1382
1383   $self->next::method(@_);
1384
1385   # if AutoCommit is disabled txn_depth never goes to 0
1386   # as a new txn is started immediately on commit
1387   $self->transaction_depth(1) if (
1388     !$self->transaction_depth
1389       and
1390     defined $self->_dbh_autocommit
1391       and
1392     ! $self->_dbh_autocommit
1393   );
1394 }
1395
1396 sub _exec_txn_rollback {
1397   shift->_dbh->rollback;
1398 }
1399
1400 # generate some identical methods
1401 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1402   no strict qw/refs/;
1403   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1404     my $self = shift;
1405     $self->_verify_pid if $self->_dbh;
1406     $self->throw_exception("Unable to $meth() on a disconnected storage")
1407       unless $self->_dbh;
1408     $self->next::method(@_);
1409   };
1410 }
1411
1412 # This used to be the top-half of _execute.  It was split out to make it
1413 #  easier to override in NoBindVars without duping the rest.  It takes up
1414 #  all of _execute's args, and emits $sql, @bind.
1415 sub _prep_for_execute {
1416   #my ($self, $op, $ident, $args) = @_;
1417   return shift->_gen_sql_bind(@_)
1418 }
1419
1420 sub _gen_sql_bind {
1421   my ($self, $op, $ident, $args) = @_;
1422
1423   my ($sql, @bind) = $self->sql_maker->$op(
1424     blessed($ident) ? $ident->from : $ident,
1425     @$args,
1426   );
1427
1428   if (
1429     ! $ENV{DBIC_DT_SEARCH_OK}
1430       and
1431     $op eq 'select'
1432       and
1433     first { blessed($_->[1]) && $_->[1]->isa('DateTime') } @bind
1434   ) {
1435     carp_unique 'DateTime objects passed to search() are not supported '
1436       . 'properly (InflateColumn::DateTime formats and settings are not '
1437       . 'respected.) See "Formatting DateTime objects in queries" in '
1438       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1439       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1440   }
1441
1442   return( $sql, $self->_resolve_bindattrs(
1443     $ident, [ @{$args->[2]{bind}||[]}, @bind ]
1444   ));
1445 }
1446
1447 sub _resolve_bindattrs {
1448   my ($self, $ident, $bind, $colinfos) = @_;
1449
1450   $colinfos ||= {};
1451
1452   my $resolve_bindinfo = sub {
1453     #my $infohash = shift;
1454
1455     %$colinfos = %{ $self->_resolve_column_info($ident) }
1456       unless keys %$colinfos;
1457
1458     my $ret;
1459     if (my $col = $_[0]->{dbic_colname}) {
1460       $ret = { %{$_[0]} };
1461
1462       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1463         if $colinfos->{$col}{data_type};
1464
1465       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1466         if $colinfos->{$col}{size};
1467     }
1468
1469     $ret || $_[0];
1470   };
1471
1472   return [ map {
1473     if (ref $_ ne 'ARRAY') {
1474       [{}, $_]
1475     }
1476     elsif (! defined $_->[0]) {
1477       [{}, $_->[1]]
1478     }
1479     elsif (ref $_->[0] eq 'HASH') {
1480       [
1481         ($_->[0]{dbd_attrs} or $_->[0]{sqlt_datatype}) ? $_->[0] : $resolve_bindinfo->($_->[0]),
1482         $_->[1]
1483       ]
1484     }
1485     elsif (ref $_->[0] eq 'SCALAR') {
1486       [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1487     }
1488     else {
1489       [ $resolve_bindinfo->({ dbic_colname => $_->[0] }), $_->[1] ]
1490     }
1491   } @$bind ];
1492 }
1493
1494 sub _format_for_trace {
1495   #my ($self, $bind) = @_;
1496
1497   ### Turn @bind from something like this:
1498   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1499   ### to this:
1500   ###   ( "'1'", "'3'" )
1501
1502   map {
1503     defined( $_ && $_->[1] )
1504       ? qq{'$_->[1]'}
1505       : q{NULL}
1506   } @{$_[1] || []};
1507 }
1508
1509 sub _query_start {
1510   my ( $self, $sql, $bind ) = @_;
1511
1512   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1513     if $self->debug;
1514 }
1515
1516 sub _query_end {
1517   my ( $self, $sql, $bind ) = @_;
1518
1519   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1520     if $self->debug;
1521 }
1522
1523 my $sba_compat;
1524 sub _dbi_attrs_for_bind {
1525   my ($self, $ident, $bind) = @_;
1526
1527   if (! defined $sba_compat) {
1528     $self->_determine_driver;
1529     $sba_compat = $self->can('source_bind_attributes') == \&source_bind_attributes
1530       ? 0
1531       : 1
1532     ;
1533   }
1534
1535   my $sba_attrs;
1536   if ($sba_compat) {
1537     my $class = ref $self;
1538     carp_unique (
1539       "The source_bind_attributes() override in $class relies on a deprecated codepath. "
1540      .'You are strongly advised to switch your code to override bind_attribute_by_datatype() '
1541      .'instead. This legacy compat shim will also disappear some time before DBIC 0.09'
1542     );
1543
1544     my $sba_attrs = $self->source_bind_attributes
1545   }
1546
1547   my @attrs;
1548
1549   for (map { $_->[0] } @$bind) {
1550     push @attrs, do {
1551       if (exists $_->{dbd_attrs}) {
1552         $_->{dbd_attrs}
1553       }
1554       elsif($_->{sqlt_datatype}) {
1555         # cache the result in the dbh_details hash, as it can not change unless
1556         # we connect to something else
1557         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1558         if (not exists $cache->{$_->{sqlt_datatype}}) {
1559           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1560         }
1561         $cache->{$_->{sqlt_datatype}};
1562       }
1563       elsif ($sba_attrs and $_->{dbic_colname}) {
1564         $sba_attrs->{$_->{dbic_colname}} || undef;
1565       }
1566       else {
1567         undef;  # always push something at this position
1568       }
1569     }
1570   }
1571
1572   return \@attrs;
1573 }
1574
1575 sub _execute {
1576   my ($self, $op, $ident, @args) = @_;
1577
1578   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1579
1580   shift->dbh_do(    # retry over disconnects
1581     '_dbh_execute',
1582     $sql,
1583     $bind,
1584     $self->_dbi_attrs_for_bind($ident, $bind)
1585   );
1586 }
1587
1588 sub _dbh_execute {
1589   my ($self, undef, $sql, $bind, $bind_attrs) = @_;
1590
1591   $self->_query_start( $sql, $bind );
1592   my $sth = $self->_sth($sql);
1593
1594   for my $i (0 .. $#$bind) {
1595     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1596       $sth->bind_param_inout(
1597         $i + 1, # bind params counts are 1-based
1598         $bind->[$i][1],
1599         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1600         $bind_attrs->[$i],
1601       );
1602     }
1603     else {
1604       $sth->bind_param(
1605         $i + 1,
1606         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1607           ? "$bind->[$i][1]"
1608           : $bind->[$i][1]
1609         ,
1610         $bind_attrs->[$i],
1611       );
1612     }
1613   }
1614
1615   # Can this fail without throwing an exception anyways???
1616   my $rv = $sth->execute();
1617   $self->throw_exception(
1618     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1619   ) if !$rv;
1620
1621   $self->_query_end( $sql, $bind );
1622
1623   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1624 }
1625
1626 sub _prefetch_autovalues {
1627   my ($self, $source, $to_insert) = @_;
1628
1629   my $colinfo = $source->columns_info;
1630
1631   my %values;
1632   for my $col (keys %$colinfo) {
1633     if (
1634       $colinfo->{$col}{auto_nextval}
1635         and
1636       (
1637         ! exists $to_insert->{$col}
1638           or
1639         ref $to_insert->{$col} eq 'SCALAR'
1640           or
1641         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1642       )
1643     ) {
1644       $values{$col} = $self->_sequence_fetch(
1645         'NEXTVAL',
1646         ( $colinfo->{$col}{sequence} ||=
1647             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1648         ),
1649       );
1650     }
1651   }
1652
1653   \%values;
1654 }
1655
1656 sub insert {
1657   my ($self, $source, $to_insert) = @_;
1658
1659   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1660
1661   # fuse the values, but keep a separate list of prefetched_values so that
1662   # they can be fused once again with the final return
1663   $to_insert = { %$to_insert, %$prefetched_values };
1664
1665   # FIXME - we seem to assume undef values as non-supplied. This is wrong.
1666   # Investigate what does it take to s/defined/exists/
1667   my $col_infos = $source->columns_info;
1668   my %pcols = map { $_ => 1 } $source->primary_columns;
1669   my (%retrieve_cols, $autoinc_supplied, $retrieve_autoinc_col);
1670   for my $col ($source->columns) {
1671     if ($col_infos->{$col}{is_auto_increment}) {
1672       $autoinc_supplied ||= 1 if defined $to_insert->{$col};
1673       $retrieve_autoinc_col ||= $col unless $autoinc_supplied;
1674     }
1675
1676     # nothing to retrieve when explicit values are supplied
1677     next if (defined $to_insert->{$col} and ! (
1678       ref $to_insert->{$col} eq 'SCALAR'
1679         or
1680       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1681     ));
1682
1683     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1684     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1685       $pcols{$col}
1686         or
1687       $col_infos->{$col}{retrieve_on_insert}
1688     );
1689   };
1690
1691   local $self->{_autoinc_supplied_for_op} = $autoinc_supplied;
1692   local $self->{_perform_autoinc_retrieval} = $retrieve_autoinc_col;
1693
1694   my ($sqla_opts, @ir_container);
1695   if (%retrieve_cols and $self->_use_insert_returning) {
1696     $sqla_opts->{returning_container} = \@ir_container
1697       if $self->_use_insert_returning_bound;
1698
1699     $sqla_opts->{returning} = [
1700       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1701     ];
1702   }
1703
1704   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1705
1706   my %returned_cols = %$to_insert;
1707   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1708     @ir_container = try {
1709       local $SIG{__WARN__} = sub {};
1710       my @r = $sth->fetchrow_array;
1711       $sth->finish;
1712       @r;
1713     } unless @ir_container;
1714
1715     @returned_cols{@$retlist} = @ir_container if @ir_container;
1716   }
1717   else {
1718     # pull in PK if needed and then everything else
1719     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1720
1721       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1722         unless $self->can('last_insert_id');
1723
1724       my @pri_values = $self->last_insert_id($source, @missing_pri);
1725
1726       $self->throw_exception( "Can't get last insert id" )
1727         unless (@pri_values == @missing_pri);
1728
1729       @returned_cols{@missing_pri} = @pri_values;
1730       delete $retrieve_cols{$_} for @missing_pri;
1731     }
1732
1733     # if there is more left to pull
1734     if (%retrieve_cols) {
1735       $self->throw_exception(
1736         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1737       ) unless %pcols;
1738
1739       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1740
1741       my $cur = DBIx::Class::ResultSet->new($source, {
1742         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1743         select => \@left_to_fetch,
1744       })->cursor;
1745
1746       @returned_cols{@left_to_fetch} = $cur->next;
1747
1748       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1749         if scalar $cur->next;
1750     }
1751   }
1752
1753   return { %$prefetched_values, %returned_cols };
1754 }
1755
1756 sub insert_bulk {
1757   my ($self, $source, $cols, $data) = @_;
1758
1759   my @col_range = (0..$#$cols);
1760
1761   # FIXME - perhaps this is not even needed? does DBI stringify?
1762   #
1763   # forcibly stringify whatever is stringifiable
1764   # ResultSet::populate() hands us a copy - safe to mangle
1765   for my $r (0 .. $#$data) {
1766     for my $c (0 .. $#{$data->[$r]}) {
1767       $data->[$r][$c] = "$data->[$r][$c]"
1768         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1769     }
1770   }
1771
1772   my $colinfos = $source->columns_info($cols);
1773
1774   local $self->{_autoinc_supplied_for_op} =
1775     (first { $_->{is_auto_increment} } values %$colinfos)
1776       ? 1
1777       : 0
1778   ;
1779
1780   # get a slice type index based on first row of data
1781   # a "column" in this context may refer to more than one bind value
1782   # e.g. \[ '?, ?', [...], [...] ]
1783   #
1784   # construct the value type index - a description of values types for every
1785   # per-column slice of $data:
1786   #
1787   # nonexistent - nonbind literal
1788   # 0 - regular value
1789   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
1790   #
1791   # also construct the column hash to pass to the SQL generator. For plain
1792   # (non literal) values - convert the members of the first row into a
1793   # literal+bind combo, with extra positional info in the bind attr hashref.
1794   # This will allow us to match the order properly, and is so contrived
1795   # because a user-supplied literal/bind (or something else specific to a
1796   # resultsource and/or storage driver) can inject extra binds along the
1797   # way, so one can't rely on "shift positions" ordering at all. Also we
1798   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
1799   # can be later matched up by address), because we want to supply a real
1800   # value on which perhaps e.g. datatype checks will be performed
1801   my ($proto_data, $value_type_idx);
1802   for my $i (@col_range) {
1803     my $colname = $cols->[$i];
1804     if (ref $data->[0][$i] eq 'SCALAR') {
1805       # no bind value at all - no type
1806
1807       $proto_data->{$colname} = $data->[0][$i];
1808     }
1809     elsif (ref $data->[0][$i] eq 'REF' and ref ${$data->[0][$i]} eq 'ARRAY' ) {
1810       # repack, so we don't end up mangling the original \[]
1811       my ($sql, @bind) = @${$data->[0][$i]};
1812
1813       # normalization of user supplied stuff
1814       my $resolved_bind = $self->_resolve_bindattrs(
1815         $source, \@bind, $colinfos,
1816       );
1817
1818       # store value-less (attrs only) bind info - we will be comparing all
1819       # supplied binds against this for sanity
1820       $value_type_idx->{$i} = [ map { $_->[0] } @$resolved_bind ];
1821
1822       $proto_data->{$colname} = \[ $sql, map { [
1823         # inject slice order to use for $proto_bind construction
1824           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $i }
1825             =>
1826           $resolved_bind->[$_][1]
1827         ] } (0 .. $#bind)
1828       ];
1829     }
1830     else {
1831       $value_type_idx->{$i} = 0;
1832
1833       $proto_data->{$colname} = \[ '?', [
1834         { dbic_colname => $colname, _bind_data_slice_idx => $i }
1835           =>
1836         $data->[0][$i]
1837       ] ];
1838     }
1839   }
1840
1841   my ($sql, $proto_bind) = $self->_prep_for_execute (
1842     'insert',
1843     $source,
1844     [ $proto_data ],
1845   );
1846
1847   if (! @$proto_bind and keys %$value_type_idx) {
1848     # if the bindlist is empty and we had some dynamic binds, this means the
1849     # storage ate them away (e.g. the NoBindVars component) and interpolated
1850     # them directly into the SQL. This obviously can't be good for multi-inserts
1851     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1852   }
1853
1854   # sanity checks
1855   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
1856   #
1857   # use an error reporting closure for convenience (less to pass)
1858   my $bad_slice_report_cref = sub {
1859     my ($msg, $r_idx, $c_idx) = @_;
1860     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1861       $msg,
1862       $cols->[$c_idx],
1863       do {
1864         require Data::Dumper::Concise;
1865         local $Data::Dumper::Maxdepth = 5;
1866         Data::Dumper::Concise::Dumper ({
1867           map { $cols->[$_] =>
1868             $data->[$r_idx][$_]
1869           } @col_range
1870         }),
1871       }
1872     );
1873   };
1874
1875   for my $col_idx (@col_range) {
1876     my $reference_val = $data->[0][$col_idx];
1877
1878     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
1879       my $val = $data->[$row_idx][$col_idx];
1880
1881       if (! exists $value_type_idx->{$col_idx}) { # literal no binds
1882         if (ref $val ne 'SCALAR') {
1883           $bad_slice_report_cref->(
1884             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
1885             $row_idx,
1886             $col_idx,
1887           );
1888         }
1889         elsif ($$val ne $$reference_val) {
1890           $bad_slice_report_cref->(
1891             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
1892             $row_idx,
1893             $col_idx,
1894           );
1895         }
1896       }
1897       elsif (! $value_type_idx->{$col_idx} ) {  # regular non-literal value
1898         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
1899           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
1900         }
1901       }
1902       else {  # binds from a \[], compare type and attrs
1903         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
1904           $bad_slice_report_cref->(
1905             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
1906             $row_idx,
1907             $col_idx,
1908           );
1909         }
1910         # start drilling down and bail out early on identical refs
1911         elsif (
1912           $reference_val != $val
1913             or
1914           $$reference_val != $$val
1915         ) {
1916           if (${$val}->[0] ne ${$reference_val}->[0]) {
1917             $bad_slice_report_cref->(
1918               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
1919               $row_idx,
1920               $col_idx,
1921             );
1922           }
1923           # need to check the bind attrs - a bind will happen only once for
1924           # the entire dataset, so any changes further down will be ignored.
1925           elsif (! Data::Compare::Compare(
1926             $value_type_idx->{$col_idx},
1927             [
1928               map
1929               { $_->[0] }
1930               @{$self->_resolve_bindattrs(
1931                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
1932               )}
1933             ],
1934           )) {
1935             $bad_slice_report_cref->(
1936               'Differing bind attributes on literal/bind values not supported',
1937               $row_idx,
1938               $col_idx,
1939             );
1940           }
1941         }
1942       }
1943     }
1944   }
1945
1946   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
1947   # are atomic (even if execute_for_fetch is a single call). Thus a safety
1948   # scope guard
1949   my $guard = $self->txn_scope_guard;
1950
1951   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
1952   my $sth = $self->_sth($sql);
1953   my $rv = do {
1954     if (@$proto_bind) {
1955       # proto bind contains the information on which pieces of $data to pull
1956       # $cols is passed in only for prettier error-reporting
1957       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
1958     }
1959     else {
1960       # bind_param_array doesn't work if there are no binds
1961       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1962     }
1963   };
1964
1965   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
1966
1967   $guard->commit;
1968
1969   return wantarray ? ($rv, $sth, @$proto_bind) : $rv;
1970 }
1971
1972 # execute_for_fetch is capable of returning data just fine (it means it
1973 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
1974 # is the void-populate fast-path we will just ignore this altogether
1975 # for the time being.
1976 sub _dbh_execute_for_fetch {
1977   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
1978
1979   my @idx_range = ( 0 .. $#$proto_bind );
1980
1981   # If we have any bind attributes to take care of, we will bind the
1982   # proto-bind data (which will never be used by execute_for_fetch)
1983   # However since column bindtypes are "sticky", this is sufficient
1984   # to get the DBD to apply the bindtype to all values later on
1985
1986   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
1987
1988   for my $i (@idx_range) {
1989     $sth->bind_param (
1990       $i+1, # DBI bind indexes are 1-based
1991       $proto_bind->[$i][1],
1992       $bind_attrs->[$i],
1993     ) if defined $bind_attrs->[$i];
1994   }
1995
1996   # At this point $data slots named in the _bind_data_slice_idx of
1997   # each piece of $proto_bind are either \[]s or plain values to be
1998   # passed in. Construct the dispensing coderef. *NOTE* the order
1999   # of $data will differ from this of the ?s in the SQL (due to
2000   # alphabetical ordering by colname). We actually do want to
2001   # preserve this behavior so that prepare_cached has a better
2002   # chance of matching on unrelated calls
2003   my %data_reorder = map { $proto_bind->[$_][0]{_bind_data_slice_idx} => $_ } @idx_range;
2004
2005   my $fetch_row_idx = -1; # saner loop this way
2006   my $fetch_tuple = sub {
2007     return undef if ++$fetch_row_idx > $#$data;
2008
2009     return [ map
2010       { (ref $_ eq 'REF' and ref $$_ eq 'ARRAY')
2011         ? map { $_->[-1] } @{$$_}[1 .. $#$$_]
2012         : $_
2013       }
2014       map
2015         { $data->[$fetch_row_idx][$_]}
2016         sort
2017           { $data_reorder{$a} <=> $data_reorder{$b} }
2018           keys %data_reorder
2019     ];
2020   };
2021
2022   my $tuple_status = [];
2023   my ($rv, $err);
2024   try {
2025     $rv = $sth->execute_for_fetch(
2026       $fetch_tuple,
2027       $tuple_status,
2028     );
2029   }
2030   catch {
2031     $err = shift;
2032   };
2033
2034   # Not all DBDs are create equal. Some throw on error, some return
2035   # an undef $rv, and some set $sth->err - try whatever we can
2036   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2037     ! defined $err
2038       and
2039     ( !defined $rv or $sth->err )
2040   );
2041
2042   # Statement must finish even if there was an exception.
2043   try {
2044     $sth->finish
2045   }
2046   catch {
2047     $err = shift unless defined $err
2048   };
2049
2050   if (defined $err) {
2051     my $i = 0;
2052     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2053
2054     $self->throw_exception("Unexpected populate error: $err")
2055       if ($i > $#$tuple_status);
2056
2057     require Data::Dumper::Concise;
2058     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2059       ($tuple_status->[$i][1] || $err),
2060       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2061     );
2062   }
2063
2064   return $rv;
2065 }
2066
2067 sub _dbh_execute_inserts_with_no_binds {
2068   my ($self, $sth, $count) = @_;
2069
2070   my $err;
2071   try {
2072     my $dbh = $self->_get_dbh;
2073     local $dbh->{RaiseError} = 1;
2074     local $dbh->{PrintError} = 0;
2075
2076     $sth->execute foreach 1..$count;
2077   }
2078   catch {
2079     $err = shift;
2080   };
2081
2082   # Make sure statement is finished even if there was an exception.
2083   try {
2084     $sth->finish
2085   }
2086   catch {
2087     $err = shift unless defined $err;
2088   };
2089
2090   $self->throw_exception($err) if defined $err;
2091
2092   return $count;
2093 }
2094
2095 sub update {
2096   #my ($self, $source, @args) = @_;
2097   shift->_execute('update', @_);
2098 }
2099
2100
2101 sub delete {
2102   #my ($self, $source, @args) = @_;
2103   shift->_execute('delete', @_);
2104 }
2105
2106 sub _select {
2107   my $self = shift;
2108   $self->_execute($self->_select_args(@_));
2109 }
2110
2111 sub _select_args_to_query {
2112   my $self = shift;
2113
2114   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2115   #  = $self->_select_args($ident, $select, $cond, $attrs);
2116   my ($op, $ident, @args) =
2117     $self->_select_args(@_);
2118
2119   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2120   my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, \@args);
2121   $prepared_bind ||= [];
2122
2123   return wantarray
2124     ? ($sql, $prepared_bind)
2125     : \[ "($sql)", @$prepared_bind ]
2126   ;
2127 }
2128
2129 sub _select_args {
2130   my ($self, $ident, $select, $where, $attrs) = @_;
2131
2132   my $sql_maker = $self->sql_maker;
2133   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2134
2135   $attrs = {
2136     %$attrs,
2137     select => $select,
2138     from => $ident,
2139     where => $where,
2140     $rs_alias && $alias2source->{$rs_alias}
2141       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2142       : ()
2143     ,
2144   };
2145
2146   # Sanity check the attributes (SQLMaker does it too, but
2147   # in case of a software_limit we'll never reach there)
2148   if (defined $attrs->{offset}) {
2149     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2150       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2151   }
2152
2153   if (defined $attrs->{rows}) {
2154     $self->throw_exception("The rows attribute must be a positive integer if present")
2155       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2156   }
2157   elsif ($attrs->{offset}) {
2158     # MySQL actually recommends this approach.  I cringe.
2159     $attrs->{rows} = $sql_maker->__max_int;
2160   }
2161
2162   my @limit;
2163
2164   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2165   # storage, unless software limit was requested
2166   if (
2167     #limited has_many
2168     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2169        ||
2170     # grouped prefetch (to satisfy group_by == select)
2171     ( $attrs->{group_by}
2172         &&
2173       @{$attrs->{group_by}}
2174         &&
2175       $attrs->{_prefetch_selector_range}
2176     )
2177   ) {
2178     ($ident, $select, $where, $attrs)
2179       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2180   }
2181   elsif (! $attrs->{software_limit} ) {
2182     push @limit, (
2183       $attrs->{rows} || (),
2184       $attrs->{offset} || (),
2185     );
2186   }
2187
2188   # try to simplify the joinmap further (prune unreferenced type-single joins)
2189   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2190
2191 ###
2192   # This would be the point to deflate anything found in $where
2193   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2194   # expect a row object. And all we have is a resultsource (it is trivial
2195   # to extract deflator coderefs via $alias2source above).
2196   #
2197   # I don't see a way forward other than changing the way deflators are
2198   # invoked, and that's just bad...
2199 ###
2200
2201   return ('select', $ident, $select, $where, $attrs, @limit);
2202 }
2203
2204 # Returns a counting SELECT for a simple count
2205 # query. Abstracted so that a storage could override
2206 # this to { count => 'firstcol' } or whatever makes
2207 # sense as a performance optimization
2208 sub _count_select {
2209   #my ($self, $source, $rs_attrs) = @_;
2210   return { count => '*' };
2211 }
2212
2213 sub source_bind_attributes {
2214   shift->throw_exception(
2215     'source_bind_attributes() was never meant to be a callable public method - '
2216    .'please contact the DBIC dev-team and describe your use case so that a reasonable '
2217    .'solution can be provided'
2218    ."\nhttp://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT"
2219   );
2220 }
2221
2222 =head2 select
2223
2224 =over 4
2225
2226 =item Arguments: $ident, $select, $condition, $attrs
2227
2228 =back
2229
2230 Handle a SQL select statement.
2231
2232 =cut
2233
2234 sub select {
2235   my $self = shift;
2236   my ($ident, $select, $condition, $attrs) = @_;
2237   return $self->cursor_class->new($self, \@_, $attrs);
2238 }
2239
2240 sub select_single {
2241   my $self = shift;
2242   my ($rv, $sth, @bind) = $self->_select(@_);
2243   my @row = $sth->fetchrow_array;
2244   my @nextrow = $sth->fetchrow_array if @row;
2245   if(@row && @nextrow) {
2246     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2247   }
2248   # Need to call finish() to work round broken DBDs
2249   $sth->finish();
2250   return @row;
2251 }
2252
2253 =head2 sql_limit_dialect
2254
2255 This is an accessor for the default SQL limit dialect used by a particular
2256 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2257 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2258 see L<DBIx::Class::SQLMaker::LimitDialects>.
2259
2260 =cut
2261
2262 sub _dbh_sth {
2263   my ($self, $dbh, $sql) = @_;
2264
2265   # 3 is the if_active parameter which avoids active sth re-use
2266   my $sth = $self->disable_sth_caching
2267     ? $dbh->prepare($sql)
2268     : $dbh->prepare_cached($sql, {}, 3);
2269
2270   # XXX You would think RaiseError would make this impossible,
2271   #  but apparently that's not true :(
2272   $self->throw_exception(
2273     $dbh->errstr
2274       ||
2275     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2276             .'an exception and/or setting $dbh->errstr',
2277       length ($sql) > 20
2278         ? substr($sql, 0, 20) . '...'
2279         : $sql
2280       ,
2281       'DBD::' . $dbh->{Driver}{Name},
2282     )
2283   ) if !$sth;
2284
2285   $sth;
2286 }
2287
2288 sub sth {
2289   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2290   shift->_sth(@_);
2291 }
2292
2293 sub _sth {
2294   my ($self, $sql) = @_;
2295   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2296 }
2297
2298 sub _dbh_columns_info_for {
2299   my ($self, $dbh, $table) = @_;
2300
2301   if ($dbh->can('column_info')) {
2302     my %result;
2303     my $caught;
2304     try {
2305       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2306       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2307       $sth->execute();
2308       while ( my $info = $sth->fetchrow_hashref() ){
2309         my %column_info;
2310         $column_info{data_type}   = $info->{TYPE_NAME};
2311         $column_info{size}      = $info->{COLUMN_SIZE};
2312         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2313         $column_info{default_value} = $info->{COLUMN_DEF};
2314         my $col_name = $info->{COLUMN_NAME};
2315         $col_name =~ s/^\"(.*)\"$/$1/;
2316
2317         $result{$col_name} = \%column_info;
2318       }
2319     } catch {
2320       $caught = 1;
2321     };
2322     return \%result if !$caught && scalar keys %result;
2323   }
2324
2325   my %result;
2326   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2327   $sth->execute;
2328   my @columns = @{$sth->{NAME_lc}};
2329   for my $i ( 0 .. $#columns ){
2330     my %column_info;
2331     $column_info{data_type} = $sth->{TYPE}->[$i];
2332     $column_info{size} = $sth->{PRECISION}->[$i];
2333     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2334
2335     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2336       $column_info{data_type} = $1;
2337       $column_info{size}    = $2;
2338     }
2339
2340     $result{$columns[$i]} = \%column_info;
2341   }
2342   $sth->finish;
2343
2344   foreach my $col (keys %result) {
2345     my $colinfo = $result{$col};
2346     my $type_num = $colinfo->{data_type};
2347     my $type_name;
2348     if(defined $type_num && $dbh->can('type_info')) {
2349       my $type_info = $dbh->type_info($type_num);
2350       $type_name = $type_info->{TYPE_NAME} if $type_info;
2351       $colinfo->{data_type} = $type_name if $type_name;
2352     }
2353   }
2354
2355   return \%result;
2356 }
2357
2358 sub columns_info_for {
2359   my ($self, $table) = @_;
2360   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2361 }
2362
2363 =head2 last_insert_id
2364
2365 Return the row id of the last insert.
2366
2367 =cut
2368
2369 sub _dbh_last_insert_id {
2370     my ($self, $dbh, $source, $col) = @_;
2371
2372     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2373
2374     return $id if defined $id;
2375
2376     my $class = ref $self;
2377     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2378 }
2379
2380 sub last_insert_id {
2381   my $self = shift;
2382   $self->_dbh_last_insert_id ($self->_dbh, @_);
2383 }
2384
2385 =head2 _native_data_type
2386
2387 =over 4
2388
2389 =item Arguments: $type_name
2390
2391 =back
2392
2393 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2394 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2395 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2396
2397 The default implementation returns C<undef>, implement in your Storage driver if
2398 you need this functionality.
2399
2400 Should map types from other databases to the native RDBMS type, for example
2401 C<VARCHAR2> to C<VARCHAR>.
2402
2403 Types with modifiers should map to the underlying data type. For example,
2404 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2405
2406 Composite types should map to the container type, for example
2407 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2408
2409 =cut
2410
2411 sub _native_data_type {
2412   #my ($self, $data_type) = @_;
2413   return undef
2414 }
2415
2416 # Check if placeholders are supported at all
2417 sub _determine_supports_placeholders {
2418   my $self = shift;
2419   my $dbh  = $self->_get_dbh;
2420
2421   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2422   # but it is inaccurate more often than not
2423   return try {
2424     local $dbh->{PrintError} = 0;
2425     local $dbh->{RaiseError} = 1;
2426     $dbh->do('select ?', {}, 1);
2427     1;
2428   }
2429   catch {
2430     0;
2431   };
2432 }
2433
2434 # Check if placeholders bound to non-string types throw exceptions
2435 #
2436 sub _determine_supports_typeless_placeholders {
2437   my $self = shift;
2438   my $dbh  = $self->_get_dbh;
2439
2440   return try {
2441     local $dbh->{PrintError} = 0;
2442     local $dbh->{RaiseError} = 1;
2443     # this specifically tests a bind that is NOT a string
2444     $dbh->do('select 1 where 1 = ?', {}, 1);
2445     1;
2446   }
2447   catch {
2448     0;
2449   };
2450 }
2451
2452 =head2 sqlt_type
2453
2454 Returns the database driver name.
2455
2456 =cut
2457
2458 sub sqlt_type {
2459   shift->_get_dbh->{Driver}->{Name};
2460 }
2461
2462 =head2 bind_attribute_by_data_type
2463
2464 Given a datatype from column info, returns a database specific bind
2465 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2466 let the database planner just handle it.
2467
2468 Generally only needed for special case column types, like bytea in postgres.
2469
2470 =cut
2471
2472 sub bind_attribute_by_data_type {
2473     return;
2474 }
2475
2476 =head2 is_datatype_numeric
2477
2478 Given a datatype from column_info, returns a boolean value indicating if
2479 the current RDBMS considers it a numeric value. This controls how
2480 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2481 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2482 be performed instead of the usual C<eq>.
2483
2484 =cut
2485
2486 sub is_datatype_numeric {
2487   #my ($self, $dt) = @_;
2488
2489   return 0 unless $_[1];
2490
2491   $_[1] =~ /^ (?:
2492     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2493   ) $/ix;
2494 }
2495
2496
2497 =head2 create_ddl_dir
2498
2499 =over 4
2500
2501 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2502
2503 =back
2504
2505 Creates a SQL file based on the Schema, for each of the specified
2506 database engines in C<\@databases> in the given directory.
2507 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2508
2509 Given a previous version number, this will also create a file containing
2510 the ALTER TABLE statements to transform the previous schema into the
2511 current one. Note that these statements may contain C<DROP TABLE> or
2512 C<DROP COLUMN> statements that can potentially destroy data.
2513
2514 The file names are created using the C<ddl_filename> method below, please
2515 override this method in your schema if you would like a different file
2516 name format. For the ALTER file, the same format is used, replacing
2517 $version in the name with "$preversion-$version".
2518
2519 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2520 The most common value for this would be C<< { add_drop_table => 1 } >>
2521 to have the SQL produced include a C<DROP TABLE> statement for each table
2522 created. For quoting purposes supply C<quote_table_names> and
2523 C<quote_field_names>.
2524
2525 If no arguments are passed, then the following default values are assumed:
2526
2527 =over 4
2528
2529 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2530
2531 =item version    - $schema->schema_version
2532
2533 =item directory  - './'
2534
2535 =item preversion - <none>
2536
2537 =back
2538
2539 By default, C<\%sqlt_args> will have
2540
2541  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2542
2543 merged with the hash passed in. To disable any of those features, pass in a
2544 hashref like the following
2545
2546  { ignore_constraint_names => 0, # ... other options }
2547
2548
2549 WARNING: You are strongly advised to check all SQL files created, before applying
2550 them.
2551
2552 =cut
2553
2554 sub create_ddl_dir {
2555   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2556
2557   unless ($dir) {
2558     carp "No directory given, using ./\n";
2559     $dir = './';
2560   } else {
2561       -d $dir
2562         or
2563       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2564         or
2565       $self->throw_exception(
2566         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2567       );
2568   }
2569
2570   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2571
2572   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2573   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2574
2575   my $schema_version = $schema->schema_version || '1.x';
2576   $version ||= $schema_version;
2577
2578   $sqltargs = {
2579     add_drop_table => 1,
2580     ignore_constraint_names => 1,
2581     ignore_index_names => 1,
2582     %{$sqltargs || {}}
2583   };
2584
2585   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2586     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2587   }
2588
2589   my $sqlt = SQL::Translator->new( $sqltargs );
2590
2591   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2592   my $sqlt_schema = $sqlt->translate({ data => $schema })
2593     or $self->throw_exception ($sqlt->error);
2594
2595   foreach my $db (@$databases) {
2596     $sqlt->reset();
2597     $sqlt->{schema} = $sqlt_schema;
2598     $sqlt->producer($db);
2599
2600     my $file;
2601     my $filename = $schema->ddl_filename($db, $version, $dir);
2602     if (-e $filename && ($version eq $schema_version )) {
2603       # if we are dumping the current version, overwrite the DDL
2604       carp "Overwriting existing DDL file - $filename";
2605       unlink($filename);
2606     }
2607
2608     my $output = $sqlt->translate;
2609     if(!$output) {
2610       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2611       next;
2612     }
2613     if(!open($file, ">$filename")) {
2614       $self->throw_exception("Can't open $filename for writing ($!)");
2615       next;
2616     }
2617     print $file $output;
2618     close($file);
2619
2620     next unless ($preversion);
2621
2622     require SQL::Translator::Diff;
2623
2624     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2625     if(!-e $prefilename) {
2626       carp("No previous schema file found ($prefilename)");
2627       next;
2628     }
2629
2630     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2631     if(-e $difffile) {
2632       carp("Overwriting existing diff file - $difffile");
2633       unlink($difffile);
2634     }
2635
2636     my $source_schema;
2637     {
2638       my $t = SQL::Translator->new($sqltargs);
2639       $t->debug( 0 );
2640       $t->trace( 0 );
2641
2642       $t->parser( $db )
2643         or $self->throw_exception ($t->error);
2644
2645       my $out = $t->translate( $prefilename )
2646         or $self->throw_exception ($t->error);
2647
2648       $source_schema = $t->schema;
2649
2650       $source_schema->name( $prefilename )
2651         unless ( $source_schema->name );
2652     }
2653
2654     # The "new" style of producers have sane normalization and can support
2655     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2656     # And we have to diff parsed SQL against parsed SQL.
2657     my $dest_schema = $sqlt_schema;
2658
2659     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2660       my $t = SQL::Translator->new($sqltargs);
2661       $t->debug( 0 );
2662       $t->trace( 0 );
2663
2664       $t->parser( $db )
2665         or $self->throw_exception ($t->error);
2666
2667       my $out = $t->translate( $filename )
2668         or $self->throw_exception ($t->error);
2669
2670       $dest_schema = $t->schema;
2671
2672       $dest_schema->name( $filename )
2673         unless $dest_schema->name;
2674     }
2675
2676     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2677                                                   $dest_schema,   $db,
2678                                                   $sqltargs
2679                                                  );
2680     if(!open $file, ">$difffile") {
2681       $self->throw_exception("Can't write to $difffile ($!)");
2682       next;
2683     }
2684     print $file $diff;
2685     close($file);
2686   }
2687 }
2688
2689 =head2 deployment_statements
2690
2691 =over 4
2692
2693 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2694
2695 =back
2696
2697 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2698
2699 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2700 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2701
2702 C<$directory> is used to return statements from files in a previously created
2703 L</create_ddl_dir> directory and is optional. The filenames are constructed
2704 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2705
2706 If no C<$directory> is specified then the statements are constructed on the
2707 fly using L<SQL::Translator> and C<$version> is ignored.
2708
2709 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2710
2711 =cut
2712
2713 sub deployment_statements {
2714   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2715   $type ||= $self->sqlt_type;
2716   $version ||= $schema->schema_version || '1.x';
2717   $dir ||= './';
2718   my $filename = $schema->ddl_filename($type, $version, $dir);
2719   if(-f $filename)
2720   {
2721       # FIXME replace this block when a proper sane sql parser is available
2722       my $file;
2723       open($file, "<$filename")
2724         or $self->throw_exception("Can't open $filename ($!)");
2725       my @rows = <$file>;
2726       close($file);
2727       return join('', @rows);
2728   }
2729
2730   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2731     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2732   }
2733
2734   # sources needs to be a parser arg, but for simplicty allow at top level
2735   # coming in
2736   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2737       if exists $sqltargs->{sources};
2738
2739   my $tr = SQL::Translator->new(
2740     producer => "SQL::Translator::Producer::${type}",
2741     %$sqltargs,
2742     parser => 'SQL::Translator::Parser::DBIx::Class',
2743     data => $schema,
2744   );
2745
2746   return preserve_context {
2747     $tr->translate
2748   } after => sub {
2749     $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2750       unless defined $_[0];
2751   };
2752 }
2753
2754 # FIXME deploy() currently does not accurately report sql errors
2755 # Will always return true while errors are warned
2756 sub deploy {
2757   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2758   my $deploy = sub {
2759     my $line = shift;
2760     return if(!$line);
2761     return if($line =~ /^--/);
2762     # next if($line =~ /^DROP/m);
2763     return if($line =~ /^BEGIN TRANSACTION/m);
2764     return if($line =~ /^COMMIT/m);
2765     return if $line =~ /^\s+$/; # skip whitespace only
2766     $self->_query_start($line);
2767     try {
2768       # do a dbh_do cycle here, as we need some error checking in
2769       # place (even though we will ignore errors)
2770       $self->dbh_do (sub { $_[1]->do($line) });
2771     } catch {
2772       carp qq{$_ (running "${line}")};
2773     };
2774     $self->_query_end($line);
2775   };
2776   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2777   if (@statements > 1) {
2778     foreach my $statement (@statements) {
2779       $deploy->( $statement );
2780     }
2781   }
2782   elsif (@statements == 1) {
2783     # split on single line comments and end of statements
2784     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2785       $deploy->( $line );
2786     }
2787   }
2788 }
2789
2790 =head2 datetime_parser
2791
2792 Returns the datetime parser class
2793
2794 =cut
2795
2796 sub datetime_parser {
2797   my $self = shift;
2798   return $self->{datetime_parser} ||= do {
2799     $self->build_datetime_parser(@_);
2800   };
2801 }
2802
2803 =head2 datetime_parser_type
2804
2805 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2806
2807 =head2 build_datetime_parser
2808
2809 See L</datetime_parser>
2810
2811 =cut
2812
2813 sub build_datetime_parser {
2814   my $self = shift;
2815   my $type = $self->datetime_parser_type(@_);
2816   return $type;
2817 }
2818
2819
2820 =head2 is_replicating
2821
2822 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2823 replicate from a master database.  Default is undef, which is the result
2824 returned by databases that don't support replication.
2825
2826 =cut
2827
2828 sub is_replicating {
2829     return;
2830
2831 }
2832
2833 =head2 lag_behind_master
2834
2835 Returns a number that represents a certain amount of lag behind a master db
2836 when a given storage is replicating.  The number is database dependent, but
2837 starts at zero and increases with the amount of lag. Default in undef
2838
2839 =cut
2840
2841 sub lag_behind_master {
2842     return;
2843 }
2844
2845 =head2 relname_to_table_alias
2846
2847 =over 4
2848
2849 =item Arguments: $relname, $join_count
2850
2851 =back
2852
2853 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2854 queries.
2855
2856 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2857 way these aliases are named.
2858
2859 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2860 otherwise C<"$relname">.
2861
2862 =cut
2863
2864 sub relname_to_table_alias {
2865   my ($self, $relname, $join_count) = @_;
2866
2867   my $alias = ($join_count && $join_count > 1 ?
2868     join('_', $relname, $join_count) : $relname);
2869
2870   return $alias;
2871 }
2872
2873 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2874 # version and it may be necessary to amend or override it for a specific storage
2875 # if such binds are necessary.
2876 sub _max_column_bytesize {
2877   my ($self, $attr) = @_;
2878
2879   my $max_size;
2880
2881   if ($attr->{sqlt_datatype}) {
2882     my $data_type = lc($attr->{sqlt_datatype});
2883
2884     if ($attr->{sqlt_size}) {
2885
2886       # String/sized-binary types
2887       if ($data_type =~ /^(?:
2888           l? (?:var)? char(?:acter)? (?:\s*varying)?
2889             |
2890           (?:var)? binary (?:\s*varying)?
2891             |
2892           raw
2893         )\b/x
2894       ) {
2895         $max_size = $attr->{sqlt_size};
2896       }
2897       # Other charset/unicode types, assume scale of 4
2898       elsif ($data_type =~ /^(?:
2899           national \s* character (?:\s*varying)?
2900             |
2901           nchar
2902             |
2903           univarchar
2904             |
2905           nvarchar
2906         )\b/x
2907       ) {
2908         $max_size = $attr->{sqlt_size} * 4;
2909       }
2910     }
2911
2912     if (!$max_size and !$self->_is_lob_type($data_type)) {
2913       $max_size = 100 # for all other (numeric?) datatypes
2914     }
2915   }
2916
2917   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
2918 }
2919
2920 # Determine if a data_type is some type of BLOB
2921 sub _is_lob_type {
2922   my ($self, $data_type) = @_;
2923   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2924     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2925                                   |varchar|character\s*varying|nvarchar
2926                                   |national\s*character\s*varying))?\z/xi);
2927 }
2928
2929 sub _is_binary_lob_type {
2930   my ($self, $data_type) = @_;
2931   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2932     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2933 }
2934
2935 sub _is_text_lob_type {
2936   my ($self, $data_type) = @_;
2937   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2938     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2939                         |national\s*character\s*varying))\z/xi);
2940 }
2941
2942 1;
2943
2944 =head1 USAGE NOTES
2945
2946 =head2 DBIx::Class and AutoCommit
2947
2948 DBIx::Class can do some wonderful magic with handling exceptions,
2949 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2950 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
2951 transaction support.
2952
2953 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2954 in an assumed transaction between commits, and you're telling us you'd
2955 like to manage that manually.  A lot of the magic protections offered by
2956 this module will go away.  We can't protect you from exceptions due to database
2957 disconnects because we don't know anything about how to restart your
2958 transactions.  You're on your own for handling all sorts of exceptional
2959 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2960 be with raw DBI.
2961
2962
2963 =head1 AUTHORS
2964
2965 Matt S. Trout <mst@shadowcatsystems.co.uk>
2966
2967 Andy Grundman <andy@hybridized.org>
2968
2969 =head1 LICENSE
2970
2971 You may distribute this code under the same terms as Perl itself.
2972
2973 =cut