d9fa57ac85201376663574f81e6df45770c5e237
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Try::Tiny;
16 use overload ();
17 use Data::Compare (); # no imports!!! guard against insane architecture
18 use namespace::clean;
19
20 # default cursor class, overridable in connect_info attributes
21 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
22
23 __PACKAGE__->mk_group_accessors('inherited' => qw/
24   sql_limit_dialect sql_quote_char sql_name_sep
25 /);
26
27 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
28
29 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
30 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
31
32 __PACKAGE__->sql_name_sep('.');
33
34 __PACKAGE__->mk_group_accessors('simple' => qw/
35   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
36   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
37   _perform_autoinc_retrieval _autoinc_supplied_for_op
38 /);
39
40 # the values for these accessors are picked out (and deleted) from
41 # the attribute hashref passed to connect_info
42 my @storage_options = qw/
43   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
44   disable_sth_caching unsafe auto_savepoint
45 /;
46 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
47
48
49 # capability definitions, using a 2-tiered accessor system
50 # The rationale is:
51 #
52 # A driver/user may define _use_X, which blindly without any checks says:
53 # "(do not) use this capability", (use_dbms_capability is an "inherited"
54 # type accessor)
55 #
56 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
57 # accessor, which in turn calls _determine_supports_X, and stores the return
58 # in a special slot on the storage object, which is wiped every time a $dbh
59 # reconnection takes place (it is not guaranteed that upon reconnection we
60 # will get the same rdbms version). _determine_supports_X does not need to
61 # exist on a driver, as we ->can for it before calling.
62
63 my @capabilities = (qw/
64   insert_returning
65   insert_returning_bound
66
67   multicolumn_in
68
69   placeholders
70   typeless_placeholders
71
72   join_optimizer
73 /);
74 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
75 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
76
77 # on by default, not strictly a capability (pending rewrite)
78 __PACKAGE__->_use_join_optimizer (1);
79 sub _determine_supports_join_optimizer { 1 };
80
81 # Each of these methods need _determine_driver called before itself
82 # in order to function reliably. This is a purely DRY optimization
83 #
84 # get_(use)_dbms_capability need to be called on the correct Storage
85 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
86 # _determine_supports_X which obv. needs a correct driver as well
87 my @rdbms_specific_methods = qw/
88   deployment_statements
89   sqlt_type
90   sql_maker
91   build_datetime_parser
92   datetime_parser_type
93
94   txn_begin
95   insert
96   insert_bulk
97   update
98   delete
99   select
100   select_single
101   with_deferred_fk_checks
102
103   get_use_dbms_capability
104   get_dbms_capability
105
106   _server_info
107   _get_server_version
108 /;
109
110 for my $meth (@rdbms_specific_methods) {
111
112   my $orig = __PACKAGE__->can ($meth)
113     or die "$meth is not a ::Storage::DBI method!";
114
115   no strict qw/refs/;
116   no warnings qw/redefine/;
117   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
118     if (
119       # only fire when invoked on an instance, a valid class-based invocation
120       # would e.g. be setting a default for an inherited accessor
121       ref $_[0]
122         and
123       ! $_[0]->_driver_determined
124         and
125       ! $_[0]->{_in_determine_driver}
126     ) {
127       $_[0]->_determine_driver;
128
129       # This for some reason crashes and burns on perl 5.8.1
130       # IFF the method ends up throwing an exception
131       #goto $_[0]->can ($meth);
132
133       my $cref = $_[0]->can ($meth);
134       goto $cref;
135     }
136
137     goto $orig;
138   };
139 }
140
141 =head1 NAME
142
143 DBIx::Class::Storage::DBI - DBI storage handler
144
145 =head1 SYNOPSIS
146
147   my $schema = MySchema->connect('dbi:SQLite:my.db');
148
149   $schema->storage->debug(1);
150
151   my @stuff = $schema->storage->dbh_do(
152     sub {
153       my ($storage, $dbh, @args) = @_;
154       $dbh->do("DROP TABLE authors");
155     },
156     @column_list
157   );
158
159   $schema->resultset('Book')->search({
160      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
161   });
162
163 =head1 DESCRIPTION
164
165 This class represents the connection to an RDBMS via L<DBI>.  See
166 L<DBIx::Class::Storage> for general information.  This pod only
167 documents DBI-specific methods and behaviors.
168
169 =head1 METHODS
170
171 =cut
172
173 sub new {
174   my $new = shift->next::method(@_);
175
176   $new->_sql_maker_opts({});
177   $new->_dbh_details({});
178   $new->{_in_do_block} = 0;
179   $new->{_dbh_gen} = 0;
180
181   # read below to see what this does
182   $new->_arm_global_destructor;
183
184   $new;
185 }
186
187 # This is hack to work around perl shooting stuff in random
188 # order on exit(). If we do not walk the remaining storage
189 # objects in an END block, there is a *small but real* chance
190 # of a fork()ed child to kill the parent's shared DBI handle,
191 # *before perl reaches the DESTROY in this package*
192 # Yes, it is ugly and effective.
193 # Additionally this registry is used by the CLONE method to
194 # make sure no handles are shared between threads
195 {
196   my %seek_and_destroy;
197
198   sub _arm_global_destructor {
199     my $self = shift;
200     my $key = refaddr ($self);
201     $seek_and_destroy{$key} = $self;
202     weaken ($seek_and_destroy{$key});
203   }
204
205   END {
206     local $?; # just in case the DBI destructor changes it somehow
207
208     # destroy just the object if not native to this process/thread
209     $_->_verify_pid for (grep
210       { defined $_ }
211       values %seek_and_destroy
212     );
213   }
214
215   sub CLONE {
216     # As per DBI's recommendation, DBIC disconnects all handles as
217     # soon as possible (DBIC will reconnect only on demand from within
218     # the thread)
219     for (values %seek_and_destroy) {
220       next unless $_;
221       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
222       $_->_dbh(undef);
223
224       $_->transaction_depth(0);
225       $_->savepoints([]);
226     }
227   }
228 }
229
230 sub DESTROY {
231   my $self = shift;
232
233   # some databases spew warnings on implicit disconnect
234   $self->_verify_pid;
235   local $SIG{__WARN__} = sub {};
236   $self->_dbh(undef);
237
238   # this op is necessary, since the very last perl runtime statement
239   # triggers a global destruction shootout, and the $SIG localization
240   # may very well be destroyed before perl actually gets to do the
241   # $dbh undef
242   1;
243 }
244
245 # handle pid changes correctly - do not destroy parent's connection
246 sub _verify_pid {
247   my $self = shift;
248
249   my $pid = $self->_conn_pid;
250   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
251     $dbh->{InactiveDestroy} = 1;
252     $self->{_dbh_gen}++;
253     $self->_dbh(undef);
254     $self->transaction_depth(0);
255     $self->savepoints([]);
256   }
257
258   return;
259 }
260
261 =head2 connect_info
262
263 This method is normally called by L<DBIx::Class::Schema/connection>, which
264 encapsulates its argument list in an arrayref before passing them here.
265
266 The argument list may contain:
267
268 =over
269
270 =item *
271
272 The same 4-element argument set one would normally pass to
273 L<DBI/connect>, optionally followed by
274 L<extra attributes|/DBIx::Class specific connection attributes>
275 recognized by DBIx::Class:
276
277   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
278
279 =item *
280
281 A single code reference which returns a connected
282 L<DBI database handle|DBI/connect> optionally followed by
283 L<extra attributes|/DBIx::Class specific connection attributes> recognized
284 by DBIx::Class:
285
286   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
287
288 =item *
289
290 A single hashref with all the attributes and the dsn/user/password
291 mixed together:
292
293   $connect_info_args = [{
294     dsn => $dsn,
295     user => $user,
296     password => $pass,
297     %dbi_attributes,
298     %extra_attributes,
299   }];
300
301   $connect_info_args = [{
302     dbh_maker => sub { DBI->connect (...) },
303     %dbi_attributes,
304     %extra_attributes,
305   }];
306
307 This is particularly useful for L<Catalyst> based applications, allowing the
308 following config (L<Config::General> style):
309
310   <Model::DB>
311     schema_class   App::DB
312     <connect_info>
313       dsn          dbi:mysql:database=test
314       user         testuser
315       password     TestPass
316       AutoCommit   1
317     </connect_info>
318   </Model::DB>
319
320 The C<dsn>/C<user>/C<password> combination can be substituted by the
321 C<dbh_maker> key whose value is a coderef that returns a connected
322 L<DBI database handle|DBI/connect>
323
324 =back
325
326 Please note that the L<DBI> docs recommend that you always explicitly
327 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
328 recommends that it be set to I<1>, and that you perform transactions
329 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
330 to I<1> if you do not do explicitly set it to zero.  This is the default
331 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
332
333 =head3 DBIx::Class specific connection attributes
334
335 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
336 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
337 the following connection options. These options can be mixed in with your other
338 L<DBI> connection attributes, or placed in a separate hashref
339 (C<\%extra_attributes>) as shown above.
340
341 Every time C<connect_info> is invoked, any previous settings for
342 these options will be cleared before setting the new ones, regardless of
343 whether any options are specified in the new C<connect_info>.
344
345
346 =over
347
348 =item on_connect_do
349
350 Specifies things to do immediately after connecting or re-connecting to
351 the database.  Its value may contain:
352
353 =over
354
355 =item a scalar
356
357 This contains one SQL statement to execute.
358
359 =item an array reference
360
361 This contains SQL statements to execute in order.  Each element contains
362 a string or a code reference that returns a string.
363
364 =item a code reference
365
366 This contains some code to execute.  Unlike code references within an
367 array reference, its return value is ignored.
368
369 =back
370
371 =item on_disconnect_do
372
373 Takes arguments in the same form as L</on_connect_do> and executes them
374 immediately before disconnecting from the database.
375
376 Note, this only runs if you explicitly call L</disconnect> on the
377 storage object.
378
379 =item on_connect_call
380
381 A more generalized form of L</on_connect_do> that calls the specified
382 C<connect_call_METHOD> methods in your storage driver.
383
384   on_connect_do => 'select 1'
385
386 is equivalent to:
387
388   on_connect_call => [ [ do_sql => 'select 1' ] ]
389
390 Its values may contain:
391
392 =over
393
394 =item a scalar
395
396 Will call the C<connect_call_METHOD> method.
397
398 =item a code reference
399
400 Will execute C<< $code->($storage) >>
401
402 =item an array reference
403
404 Each value can be a method name or code reference.
405
406 =item an array of arrays
407
408 For each array, the first item is taken to be the C<connect_call_> method name
409 or code reference, and the rest are parameters to it.
410
411 =back
412
413 Some predefined storage methods you may use:
414
415 =over
416
417 =item do_sql
418
419 Executes a SQL string or a code reference that returns a SQL string. This is
420 what L</on_connect_do> and L</on_disconnect_do> use.
421
422 It can take:
423
424 =over
425
426 =item a scalar
427
428 Will execute the scalar as SQL.
429
430 =item an arrayref
431
432 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
433 attributes hashref and bind values.
434
435 =item a code reference
436
437 Will execute C<< $code->($storage) >> and execute the return array refs as
438 above.
439
440 =back
441
442 =item datetime_setup
443
444 Execute any statements necessary to initialize the database session to return
445 and accept datetime/timestamp values used with
446 L<DBIx::Class::InflateColumn::DateTime>.
447
448 Only necessary for some databases, see your specific storage driver for
449 implementation details.
450
451 =back
452
453 =item on_disconnect_call
454
455 Takes arguments in the same form as L</on_connect_call> and executes them
456 immediately before disconnecting from the database.
457
458 Calls the C<disconnect_call_METHOD> methods as opposed to the
459 C<connect_call_METHOD> methods called by L</on_connect_call>.
460
461 Note, this only runs if you explicitly call L</disconnect> on the
462 storage object.
463
464 =item disable_sth_caching
465
466 If set to a true value, this option will disable the caching of
467 statement handles via L<DBI/prepare_cached>.
468
469 =item limit_dialect
470
471 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
472 default L</sql_limit_dialect> setting of the storage (if any). For a list
473 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
474
475 =item quote_names
476
477 When true automatically sets L</quote_char> and L</name_sep> to the characters
478 appropriate for your particular RDBMS. This option is preferred over specifying
479 L</quote_char> directly.
480
481 =item quote_char
482
483 Specifies what characters to use to quote table and column names.
484
485 C<quote_char> expects either a single character, in which case is it
486 is placed on either side of the table/column name, or an arrayref of length
487 2 in which case the table/column name is placed between the elements.
488
489 For example under MySQL you should use C<< quote_char => '`' >>, and for
490 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
491
492 =item name_sep
493
494 This parameter is only useful in conjunction with C<quote_char>, and is used to
495 specify the character that separates elements (schemas, tables, columns) from
496 each other. If unspecified it defaults to the most commonly used C<.>.
497
498 =item unsafe
499
500 This Storage driver normally installs its own C<HandleError>, sets
501 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
502 all database handles, including those supplied by a coderef.  It does this
503 so that it can have consistent and useful error behavior.
504
505 If you set this option to a true value, Storage will not do its usual
506 modifications to the database handle's attributes, and instead relies on
507 the settings in your connect_info DBI options (or the values you set in
508 your connection coderef, in the case that you are connecting via coderef).
509
510 Note that your custom settings can cause Storage to malfunction,
511 especially if you set a C<HandleError> handler that suppresses exceptions
512 and/or disable C<RaiseError>.
513
514 =item auto_savepoint
515
516 If this option is true, L<DBIx::Class> will use savepoints when nesting
517 transactions, making it possible to recover from failure in the inner
518 transaction without having to abort all outer transactions.
519
520 =item cursor_class
521
522 Use this argument to supply a cursor class other than the default
523 L<DBIx::Class::Storage::DBI::Cursor>.
524
525 =back
526
527 Some real-life examples of arguments to L</connect_info> and
528 L<DBIx::Class::Schema/connect>
529
530   # Simple SQLite connection
531   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
532
533   # Connect via subref
534   ->connect_info([ sub { DBI->connect(...) } ]);
535
536   # Connect via subref in hashref
537   ->connect_info([{
538     dbh_maker => sub { DBI->connect(...) },
539     on_connect_do => 'alter session ...',
540   }]);
541
542   # A bit more complicated
543   ->connect_info(
544     [
545       'dbi:Pg:dbname=foo',
546       'postgres',
547       'my_pg_password',
548       { AutoCommit => 1 },
549       { quote_char => q{"} },
550     ]
551   );
552
553   # Equivalent to the previous example
554   ->connect_info(
555     [
556       'dbi:Pg:dbname=foo',
557       'postgres',
558       'my_pg_password',
559       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
560     ]
561   );
562
563   # Same, but with hashref as argument
564   # See parse_connect_info for explanation
565   ->connect_info(
566     [{
567       dsn         => 'dbi:Pg:dbname=foo',
568       user        => 'postgres',
569       password    => 'my_pg_password',
570       AutoCommit  => 1,
571       quote_char  => q{"},
572       name_sep    => q{.},
573     }]
574   );
575
576   # Subref + DBIx::Class-specific connection options
577   ->connect_info(
578     [
579       sub { DBI->connect(...) },
580       {
581           quote_char => q{`},
582           name_sep => q{@},
583           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
584           disable_sth_caching => 1,
585       },
586     ]
587   );
588
589
590
591 =cut
592
593 sub connect_info {
594   my ($self, $info) = @_;
595
596   return $self->_connect_info if !$info;
597
598   $self->_connect_info($info); # copy for _connect_info
599
600   $info = $self->_normalize_connect_info($info)
601     if ref $info eq 'ARRAY';
602
603   for my $storage_opt (keys %{ $info->{storage_options} }) {
604     my $value = $info->{storage_options}{$storage_opt};
605
606     $self->$storage_opt($value);
607   }
608
609   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
610   #  the new set of options
611   $self->_sql_maker(undef);
612   $self->_sql_maker_opts({});
613
614   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
615     my $value = $info->{sql_maker_options}{$sql_maker_opt};
616
617     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
618   }
619
620   my %attrs = (
621     %{ $self->_default_dbi_connect_attributes || {} },
622     %{ $info->{attributes} || {} },
623   );
624
625   my @args = @{ $info->{arguments} };
626
627   if (keys %attrs and ref $args[0] ne 'CODE') {
628     carp
629         'You provided explicit AutoCommit => 0 in your connection_info. '
630       . 'This is almost universally a bad idea (see the footnotes of '
631       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
632       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
633       . 'this warning.'
634       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
635
636     push @args, \%attrs if keys %attrs;
637   }
638   $self->_dbi_connect_info(\@args);
639
640   # FIXME - dirty:
641   # save attributes them in a separate accessor so they are always
642   # introspectable, even in case of a CODE $dbhmaker
643   $self->_dbic_connect_attributes (\%attrs);
644
645   return $self->_connect_info;
646 }
647
648 sub _normalize_connect_info {
649   my ($self, $info_arg) = @_;
650   my %info;
651
652   my @args = @$info_arg;  # take a shallow copy for further mutilation
653
654   # combine/pre-parse arguments depending on invocation style
655
656   my %attrs;
657   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
658     %attrs = %{ $args[1] || {} };
659     @args = $args[0];
660   }
661   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
662     %attrs = %{$args[0]};
663     @args = ();
664     if (my $code = delete $attrs{dbh_maker}) {
665       @args = $code;
666
667       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
668       if (@ignored) {
669         carp sprintf (
670             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
671           . "to the result of 'dbh_maker'",
672
673           join (', ', map { "'$_'" } (@ignored) ),
674         );
675       }
676     }
677     else {
678       @args = delete @attrs{qw/dsn user password/};
679     }
680   }
681   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
682     %attrs = (
683       % { $args[3] || {} },
684       % { $args[4] || {} },
685     );
686     @args = @args[0,1,2];
687   }
688
689   $info{arguments} = \@args;
690
691   my @storage_opts = grep exists $attrs{$_},
692     @storage_options, 'cursor_class';
693
694   @{ $info{storage_options} }{@storage_opts} =
695     delete @attrs{@storage_opts} if @storage_opts;
696
697   my @sql_maker_opts = grep exists $attrs{$_},
698     qw/limit_dialect quote_char name_sep quote_names/;
699
700   @{ $info{sql_maker_options} }{@sql_maker_opts} =
701     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
702
703   $info{attributes} = \%attrs if %attrs;
704
705   return \%info;
706 }
707
708 sub _default_dbi_connect_attributes () {
709   +{
710     AutoCommit => 1,
711     PrintError => 0,
712     RaiseError => 1,
713     ShowErrorStatement => 1,
714   };
715 }
716
717 =head2 on_connect_do
718
719 This method is deprecated in favour of setting via L</connect_info>.
720
721 =cut
722
723 =head2 on_disconnect_do
724
725 This method is deprecated in favour of setting via L</connect_info>.
726
727 =cut
728
729 sub _parse_connect_do {
730   my ($self, $type) = @_;
731
732   my $val = $self->$type;
733   return () if not defined $val;
734
735   my @res;
736
737   if (not ref($val)) {
738     push @res, [ 'do_sql', $val ];
739   } elsif (ref($val) eq 'CODE') {
740     push @res, $val;
741   } elsif (ref($val) eq 'ARRAY') {
742     push @res, map { [ 'do_sql', $_ ] } @$val;
743   } else {
744     $self->throw_exception("Invalid type for $type: ".ref($val));
745   }
746
747   return \@res;
748 }
749
750 =head2 dbh_do
751
752 Arguments: ($subref | $method_name), @extra_coderef_args?
753
754 Execute the given $subref or $method_name using the new exception-based
755 connection management.
756
757 The first two arguments will be the storage object that C<dbh_do> was called
758 on and a database handle to use.  Any additional arguments will be passed
759 verbatim to the called subref as arguments 2 and onwards.
760
761 Using this (instead of $self->_dbh or $self->dbh) ensures correct
762 exception handling and reconnection (or failover in future subclasses).
763
764 Your subref should have no side-effects outside of the database, as
765 there is the potential for your subref to be partially double-executed
766 if the database connection was stale/dysfunctional.
767
768 Example:
769
770   my @stuff = $schema->storage->dbh_do(
771     sub {
772       my ($storage, $dbh, @cols) = @_;
773       my $cols = join(q{, }, @cols);
774       $dbh->selectrow_array("SELECT $cols FROM foo");
775     },
776     @column_list
777   );
778
779 =cut
780
781 sub dbh_do {
782   my $self = shift;
783   my $run_target = shift;
784
785   # short circuit when we know there is no need for a runner
786   #
787   # FIXME - asumption may be wrong
788   # the rationale for the txn_depth check is that if this block is a part
789   # of a larger transaction, everything up to that point is screwed anyway
790   return $self->$run_target($self->_get_dbh, @_)
791     if $self->{_in_do_block} or $self->transaction_depth;
792
793   my $args = \@_;
794
795   DBIx::Class::Storage::BlockRunner->new(
796     storage => $self,
797     run_code => sub { $self->$run_target ($self->_get_dbh, @$args ) },
798     wrap_txn => 0,
799     retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) },
800   )->run;
801 }
802
803 sub txn_do {
804   $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
805   shift->next::method(@_);
806 }
807
808 =head2 disconnect
809
810 Our C<disconnect> method also performs a rollback first if the
811 database is not in C<AutoCommit> mode.
812
813 =cut
814
815 sub disconnect {
816   my ($self) = @_;
817
818   if( $self->_dbh ) {
819     my @actions;
820
821     push @actions, ( $self->on_disconnect_call || () );
822     push @actions, $self->_parse_connect_do ('on_disconnect_do');
823
824     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
825
826     # stops the "implicit rollback on disconnect" warning
827     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
828
829     %{ $self->_dbh->{CachedKids} } = ();
830     $self->_dbh->disconnect;
831     $self->_dbh(undef);
832     $self->{_dbh_gen}++;
833   }
834 }
835
836 =head2 with_deferred_fk_checks
837
838 =over 4
839
840 =item Arguments: C<$coderef>
841
842 =item Return Value: The return value of $coderef
843
844 =back
845
846 Storage specific method to run the code ref with FK checks deferred or
847 in MySQL's case disabled entirely.
848
849 =cut
850
851 # Storage subclasses should override this
852 sub with_deferred_fk_checks {
853   my ($self, $sub) = @_;
854   $sub->();
855 }
856
857 =head2 connected
858
859 =over
860
861 =item Arguments: none
862
863 =item Return Value: 1|0
864
865 =back
866
867 Verifies that the current database handle is active and ready to execute
868 an SQL statement (e.g. the connection did not get stale, server is still
869 answering, etc.) This method is used internally by L</dbh>.
870
871 =cut
872
873 sub connected {
874   my $self = shift;
875   return 0 unless $self->_seems_connected;
876
877   #be on the safe side
878   local $self->_dbh->{RaiseError} = 1;
879
880   return $self->_ping;
881 }
882
883 sub _seems_connected {
884   my $self = shift;
885
886   $self->_verify_pid;
887
888   my $dbh = $self->_dbh
889     or return 0;
890
891   return $dbh->FETCH('Active');
892 }
893
894 sub _ping {
895   my $self = shift;
896
897   my $dbh = $self->_dbh or return 0;
898
899   return $dbh->ping;
900 }
901
902 sub ensure_connected {
903   my ($self) = @_;
904
905   unless ($self->connected) {
906     $self->_populate_dbh;
907   }
908 }
909
910 =head2 dbh
911
912 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
913 is guaranteed to be healthy by implicitly calling L</connected>, and if
914 necessary performing a reconnection before returning. Keep in mind that this
915 is very B<expensive> on some database engines. Consider using L</dbh_do>
916 instead.
917
918 =cut
919
920 sub dbh {
921   my ($self) = @_;
922
923   if (not $self->_dbh) {
924     $self->_populate_dbh;
925   } else {
926     $self->ensure_connected;
927   }
928   return $self->_dbh;
929 }
930
931 # this is the internal "get dbh or connect (don't check)" method
932 sub _get_dbh {
933   my $self = shift;
934   $self->_verify_pid;
935   $self->_populate_dbh unless $self->_dbh;
936   return $self->_dbh;
937 }
938
939 sub sql_maker {
940   my ($self) = @_;
941   unless ($self->_sql_maker) {
942     my $sql_maker_class = $self->sql_maker_class;
943
944     my %opts = %{$self->_sql_maker_opts||{}};
945     my $dialect =
946       $opts{limit_dialect}
947         ||
948       $self->sql_limit_dialect
949         ||
950       do {
951         my $s_class = (ref $self) || $self;
952         carp (
953           "Your storage class ($s_class) does not set sql_limit_dialect and you "
954         . 'have not supplied an explicit limit_dialect in your connection_info. '
955         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
956         . 'databases but can be (and often is) painfully slow. '
957         . "Please file an RT ticket against '$s_class' ."
958         );
959
960         'GenericSubQ';
961       }
962     ;
963
964     my ($quote_char, $name_sep);
965
966     if ($opts{quote_names}) {
967       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
968         my $s_class = (ref $self) || $self;
969         carp (
970           "You requested 'quote_names' but your storage class ($s_class) does "
971         . 'not explicitly define a default sql_quote_char and you have not '
972         . 'supplied a quote_char as part of your connection_info. DBIC will '
973         .q{default to the ANSI SQL standard quote '"', which works most of }
974         . "the time. Please file an RT ticket against '$s_class'."
975         );
976
977         '"'; # RV
978       };
979
980       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
981     }
982
983     $self->_sql_maker($sql_maker_class->new(
984       bindtype=>'columns',
985       array_datatypes => 1,
986       limit_dialect => $dialect,
987       ($quote_char ? (quote_char => $quote_char) : ()),
988       name_sep => ($name_sep || '.'),
989       %opts,
990     ));
991   }
992   return $self->_sql_maker;
993 }
994
995 # nothing to do by default
996 sub _rebless {}
997 sub _init {}
998
999 sub _populate_dbh {
1000   my ($self) = @_;
1001
1002   my @info = @{$self->_dbi_connect_info || []};
1003   $self->_dbh(undef); # in case ->connected failed we might get sent here
1004   $self->_dbh_details({}); # reset everything we know
1005
1006   $self->_dbh($self->_connect(@info));
1007
1008   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1009
1010   $self->_determine_driver;
1011
1012   # Always set the transaction depth on connect, since
1013   #  there is no transaction in progress by definition
1014   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1015
1016   $self->_run_connection_actions unless $self->{_in_determine_driver};
1017 }
1018
1019 sub _run_connection_actions {
1020   my $self = shift;
1021   my @actions;
1022
1023   push @actions, ( $self->on_connect_call || () );
1024   push @actions, $self->_parse_connect_do ('on_connect_do');
1025
1026   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1027 }
1028
1029
1030
1031 sub set_use_dbms_capability {
1032   $_[0]->set_inherited ($_[1], $_[2]);
1033 }
1034
1035 sub get_use_dbms_capability {
1036   my ($self, $capname) = @_;
1037
1038   my $use = $self->get_inherited ($capname);
1039   return defined $use
1040     ? $use
1041     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1042   ;
1043 }
1044
1045 sub set_dbms_capability {
1046   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1047 }
1048
1049 sub get_dbms_capability {
1050   my ($self, $capname) = @_;
1051
1052   my $cap = $self->_dbh_details->{capability}{$capname};
1053
1054   unless (defined $cap) {
1055     if (my $meth = $self->can ("_determine$capname")) {
1056       $cap = $self->$meth ? 1 : 0;
1057     }
1058     else {
1059       $cap = 0;
1060     }
1061
1062     $self->set_dbms_capability ($capname, $cap);
1063   }
1064
1065   return $cap;
1066 }
1067
1068 sub _server_info {
1069   my $self = shift;
1070
1071   my $info;
1072   unless ($info = $self->_dbh_details->{info}) {
1073
1074     $info = {};
1075
1076     my $server_version = try { $self->_get_server_version };
1077
1078     if (defined $server_version) {
1079       $info->{dbms_version} = $server_version;
1080
1081       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1082       my @verparts = split (/\./, $numeric_version);
1083       if (
1084         @verparts
1085           &&
1086         $verparts[0] <= 999
1087       ) {
1088         # consider only up to 3 version parts, iff not more than 3 digits
1089         my @use_parts;
1090         while (@verparts && @use_parts < 3) {
1091           my $p = shift @verparts;
1092           last if $p > 999;
1093           push @use_parts, $p;
1094         }
1095         push @use_parts, 0 while @use_parts < 3;
1096
1097         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1098       }
1099     }
1100
1101     $self->_dbh_details->{info} = $info;
1102   }
1103
1104   return $info;
1105 }
1106
1107 sub _get_server_version {
1108   shift->_dbh_get_info(18);
1109 }
1110
1111 sub _dbh_get_info {
1112   my ($self, $info) = @_;
1113
1114   return try { $self->_get_dbh->get_info($info) } || undef;
1115 }
1116
1117 sub _determine_driver {
1118   my ($self) = @_;
1119
1120   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1121     my $started_connected = 0;
1122     local $self->{_in_determine_driver} = 1;
1123
1124     if (ref($self) eq __PACKAGE__) {
1125       my $driver;
1126       if ($self->_dbh) { # we are connected
1127         $driver = $self->_dbh->{Driver}{Name};
1128         $started_connected = 1;
1129       } else {
1130         # if connect_info is a CODEREF, we have no choice but to connect
1131         if (ref $self->_dbi_connect_info->[0] &&
1132             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1133           $self->_populate_dbh;
1134           $driver = $self->_dbh->{Driver}{Name};
1135         }
1136         else {
1137           # try to use dsn to not require being connected, the driver may still
1138           # force a connection in _rebless to determine version
1139           # (dsn may not be supplied at all if all we do is make a mock-schema)
1140           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1141           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1142           $driver ||= $ENV{DBI_DRIVER};
1143         }
1144       }
1145
1146       if ($driver) {
1147         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1148         if ($self->load_optional_class($storage_class)) {
1149           mro::set_mro($storage_class, 'c3');
1150           bless $self, $storage_class;
1151           $self->_rebless();
1152         }
1153       }
1154     }
1155
1156     $self->_driver_determined(1);
1157
1158     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1159
1160     $self->_init; # run driver-specific initializations
1161
1162     $self->_run_connection_actions
1163         if !$started_connected && defined $self->_dbh;
1164   }
1165 }
1166
1167 sub _do_connection_actions {
1168   my $self          = shift;
1169   my $method_prefix = shift;
1170   my $call          = shift;
1171
1172   if (not ref($call)) {
1173     my $method = $method_prefix . $call;
1174     $self->$method(@_);
1175   } elsif (ref($call) eq 'CODE') {
1176     $self->$call(@_);
1177   } elsif (ref($call) eq 'ARRAY') {
1178     if (ref($call->[0]) ne 'ARRAY') {
1179       $self->_do_connection_actions($method_prefix, $_) for @$call;
1180     } else {
1181       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1182     }
1183   } else {
1184     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1185   }
1186
1187   return $self;
1188 }
1189
1190 sub connect_call_do_sql {
1191   my $self = shift;
1192   $self->_do_query(@_);
1193 }
1194
1195 sub disconnect_call_do_sql {
1196   my $self = shift;
1197   $self->_do_query(@_);
1198 }
1199
1200 # override in db-specific backend when necessary
1201 sub connect_call_datetime_setup { 1 }
1202
1203 sub _do_query {
1204   my ($self, $action) = @_;
1205
1206   if (ref $action eq 'CODE') {
1207     $action = $action->($self);
1208     $self->_do_query($_) foreach @$action;
1209   }
1210   else {
1211     # Most debuggers expect ($sql, @bind), so we need to exclude
1212     # the attribute hash which is the second argument to $dbh->do
1213     # furthermore the bind values are usually to be presented
1214     # as named arrayref pairs, so wrap those here too
1215     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1216     my $sql = shift @do_args;
1217     my $attrs = shift @do_args;
1218     my @bind = map { [ undef, $_ ] } @do_args;
1219
1220     $self->dbh_do(sub {
1221       $_[0]->_query_start($sql, \@bind);
1222       $_[1]->do($sql, $attrs, @do_args);
1223       $_[0]->_query_end($sql, \@bind);
1224     });
1225   }
1226
1227   return $self;
1228 }
1229
1230 sub _connect {
1231   my ($self, @info) = @_;
1232
1233   $self->throw_exception("You failed to provide any connection info")
1234     if !@info;
1235
1236   my ($old_connect_via, $dbh);
1237
1238   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1239     $old_connect_via = $DBI::connect_via;
1240     $DBI::connect_via = 'connect';
1241   }
1242
1243   try {
1244     if(ref $info[0] eq 'CODE') {
1245       $dbh = $info[0]->();
1246     }
1247     else {
1248       require DBI;
1249       $dbh = DBI->connect(@info);
1250     }
1251
1252     if (!$dbh) {
1253       die $DBI::errstr;
1254     }
1255
1256     unless ($self->unsafe) {
1257
1258       $self->throw_exception(
1259         'Refusing clobbering of {HandleError} installed on externally supplied '
1260        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1261       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1262
1263       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1264       # request, or an external handle. Complain and set anyway
1265       unless ($dbh->{RaiseError}) {
1266         carp( ref $info[0] eq 'CODE'
1267
1268           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1269            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1270            .'attribute has been supplied'
1271
1272           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1273            .'unsafe => 1. Toggling RaiseError back to true'
1274         );
1275
1276         $dbh->{RaiseError} = 1;
1277       }
1278
1279       # this odd anonymous coderef dereference is in fact really
1280       # necessary to avoid the unwanted effect described in perl5
1281       # RT#75792
1282       sub {
1283         my $weak_self = $_[0];
1284         weaken $weak_self;
1285
1286         # the coderef is blessed so we can distinguish it from externally
1287         # supplied handles (which must be preserved)
1288         $_[1]->{HandleError} = bless sub {
1289           if ($weak_self) {
1290             $weak_self->throw_exception("DBI Exception: $_[0]");
1291           }
1292           else {
1293             # the handler may be invoked by something totally out of
1294             # the scope of DBIC
1295             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1296           }
1297         }, '__DBIC__DBH__ERROR__HANDLER__';
1298       }->($self, $dbh);
1299     }
1300   }
1301   catch {
1302     $self->throw_exception("DBI Connection failed: $_")
1303   }
1304   finally {
1305     $DBI::connect_via = $old_connect_via if $old_connect_via;
1306   };
1307
1308   $self->_dbh_autocommit($dbh->{AutoCommit});
1309   $dbh;
1310 }
1311
1312 sub txn_begin {
1313   my $self = shift;
1314
1315   # this means we have not yet connected and do not know the AC status
1316   # (e.g. coderef $dbh), need a full-fledged connection check
1317   if (! defined $self->_dbh_autocommit) {
1318     $self->ensure_connected;
1319   }
1320   # Otherwise simply connect or re-connect on pid changes
1321   else {
1322     $self->_get_dbh;
1323   }
1324
1325   $self->next::method(@_);
1326 }
1327
1328 sub _exec_txn_begin {
1329   my $self = shift;
1330
1331   # if the user is utilizing txn_do - good for him, otherwise we need to
1332   # ensure that the $dbh is healthy on BEGIN.
1333   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1334   # will be replaced by a failure of begin_work itself (which will be
1335   # then retried on reconnect)
1336   if ($self->{_in_do_block}) {
1337     $self->_dbh->begin_work;
1338   } else {
1339     $self->dbh_do(sub { $_[1]->begin_work });
1340   }
1341 }
1342
1343 sub txn_commit {
1344   my $self = shift;
1345
1346   $self->_verify_pid if $self->_dbh;
1347   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1348     unless $self->_dbh;
1349
1350   # esoteric case for folks using external $dbh handles
1351   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1352     carp "Storage transaction_depth 0 does not match "
1353         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1354     $self->transaction_depth(1);
1355   }
1356
1357   $self->next::method(@_);
1358
1359   # if AutoCommit is disabled txn_depth never goes to 0
1360   # as a new txn is started immediately on commit
1361   $self->transaction_depth(1) if (
1362     !$self->transaction_depth
1363       and
1364     defined $self->_dbh_autocommit
1365       and
1366     ! $self->_dbh_autocommit
1367   );
1368 }
1369
1370 sub _exec_txn_commit {
1371   shift->_dbh->commit;
1372 }
1373
1374 sub txn_rollback {
1375   my $self = shift;
1376
1377   $self->_verify_pid if $self->_dbh;
1378   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1379     unless $self->_dbh;
1380
1381   # esoteric case for folks using external $dbh handles
1382   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1383     carp "Storage transaction_depth 0 does not match "
1384         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1385     $self->transaction_depth(1);
1386   }
1387
1388   $self->next::method(@_);
1389
1390   # if AutoCommit is disabled txn_depth never goes to 0
1391   # as a new txn is started immediately on commit
1392   $self->transaction_depth(1) if (
1393     !$self->transaction_depth
1394       and
1395     defined $self->_dbh_autocommit
1396       and
1397     ! $self->_dbh_autocommit
1398   );
1399 }
1400
1401 sub _exec_txn_rollback {
1402   shift->_dbh->rollback;
1403 }
1404
1405 # generate some identical methods
1406 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1407   no strict qw/refs/;
1408   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1409     my $self = shift;
1410     $self->_verify_pid if $self->_dbh;
1411     $self->throw_exception("Unable to $meth() on a disconnected storage")
1412       unless $self->_dbh;
1413     $self->next::method(@_);
1414   };
1415 }
1416
1417 # This used to be the top-half of _execute.  It was split out to make it
1418 #  easier to override in NoBindVars without duping the rest.  It takes up
1419 #  all of _execute's args, and emits $sql, @bind.
1420 sub _prep_for_execute {
1421   #my ($self, $op, $ident, $args) = @_;
1422   return shift->_gen_sql_bind(@_)
1423 }
1424
1425 sub _gen_sql_bind {
1426   my ($self, $op, $ident, $args) = @_;
1427
1428   my ($sql, @bind) = $self->sql_maker->$op(
1429     blessed($ident) ? $ident->from : $ident,
1430     @$args,
1431   );
1432
1433   if (
1434     ! $ENV{DBIC_DT_SEARCH_OK}
1435       and
1436     $op eq 'select'
1437       and
1438     first { blessed($_->[1]) && $_->[1]->isa('DateTime') } @bind
1439   ) {
1440     carp_unique 'DateTime objects passed to search() are not supported '
1441       . 'properly (InflateColumn::DateTime formats and settings are not '
1442       . 'respected.) See "Formatting DateTime objects in queries" in '
1443       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1444       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1445   }
1446
1447   return( $sql, $self->_resolve_bindattrs(
1448     $ident, [ @{$args->[2]{bind}||[]}, @bind ]
1449   ));
1450 }
1451
1452 sub _resolve_bindattrs {
1453   my ($self, $ident, $bind, $colinfos) = @_;
1454
1455   $colinfos ||= {};
1456
1457   my $resolve_bindinfo = sub {
1458     #my $infohash = shift;
1459
1460     %$colinfos = %{ $self->_resolve_column_info($ident) }
1461       unless keys %$colinfos;
1462
1463     my $ret;
1464     if (my $col = $_[0]->{dbic_colname}) {
1465       $ret = { %{$_[0]} };
1466
1467       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1468         if $colinfos->{$col}{data_type};
1469
1470       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1471         if $colinfos->{$col}{size};
1472     }
1473
1474     $ret || $_[0];
1475   };
1476
1477   return [ map {
1478     if (ref $_ ne 'ARRAY') {
1479       [{}, $_]
1480     }
1481     elsif (! defined $_->[0]) {
1482       [{}, $_->[1]]
1483     }
1484     elsif (ref $_->[0] eq 'HASH') {
1485       [
1486         ($_->[0]{dbd_attrs} or $_->[0]{sqlt_datatype}) ? $_->[0] : $resolve_bindinfo->($_->[0]),
1487         $_->[1]
1488       ]
1489     }
1490     elsif (ref $_->[0] eq 'SCALAR') {
1491       [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1492     }
1493     else {
1494       [ $resolve_bindinfo->({ dbic_colname => $_->[0] }), $_->[1] ]
1495     }
1496   } @$bind ];
1497 }
1498
1499 sub _format_for_trace {
1500   #my ($self, $bind) = @_;
1501
1502   ### Turn @bind from something like this:
1503   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1504   ### to this:
1505   ###   ( "'1'", "'3'" )
1506
1507   map {
1508     defined( $_ && $_->[1] )
1509       ? qq{'$_->[1]'}
1510       : q{NULL}
1511   } @{$_[1] || []};
1512 }
1513
1514 sub _query_start {
1515   my ( $self, $sql, $bind ) = @_;
1516
1517   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1518     if $self->debug;
1519 }
1520
1521 sub _query_end {
1522   my ( $self, $sql, $bind ) = @_;
1523
1524   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1525     if $self->debug;
1526 }
1527
1528 my $sba_compat;
1529 sub _dbi_attrs_for_bind {
1530   my ($self, $ident, $bind) = @_;
1531
1532   if (! defined $sba_compat) {
1533     $self->_determine_driver;
1534     $sba_compat = $self->can('source_bind_attributes') == \&source_bind_attributes
1535       ? 0
1536       : 1
1537     ;
1538   }
1539
1540   my $sba_attrs;
1541   if ($sba_compat) {
1542     my $class = ref $self;
1543     carp_unique (
1544       "The source_bind_attributes() override in $class relies on a deprecated codepath. "
1545      .'You are strongly advised to switch your code to override bind_attribute_by_datatype() '
1546      .'instead. This legacy compat shim will also disappear some time before DBIC 0.09'
1547     );
1548
1549     my $sba_attrs = $self->source_bind_attributes
1550   }
1551
1552   my @attrs;
1553
1554   for (map { $_->[0] } @$bind) {
1555     push @attrs, do {
1556       if (exists $_->{dbd_attrs}) {
1557         $_->{dbd_attrs}
1558       }
1559       elsif($_->{sqlt_datatype}) {
1560         # cache the result in the dbh_details hash, as it can not change unless
1561         # we connect to something else
1562         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1563         if (not exists $cache->{$_->{sqlt_datatype}}) {
1564           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1565         }
1566         $cache->{$_->{sqlt_datatype}};
1567       }
1568       elsif ($sba_attrs and $_->{dbic_colname}) {
1569         $sba_attrs->{$_->{dbic_colname}} || undef;
1570       }
1571       else {
1572         undef;  # always push something at this position
1573       }
1574     }
1575   }
1576
1577   return \@attrs;
1578 }
1579
1580 sub _execute {
1581   my ($self, $op, $ident, @args) = @_;
1582
1583   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1584
1585   shift->dbh_do(    # retry over disconnects
1586     '_dbh_execute',
1587     $sql,
1588     $bind,
1589     $self->_dbi_attrs_for_bind($ident, $bind)
1590   );
1591 }
1592
1593 sub _dbh_execute {
1594   my ($self, undef, $sql, $bind, $bind_attrs) = @_;
1595
1596   $self->_query_start( $sql, $bind );
1597   my $sth = $self->_sth($sql);
1598
1599   for my $i (0 .. $#$bind) {
1600     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1601       $sth->bind_param_inout(
1602         $i + 1, # bind params counts are 1-based
1603         $bind->[$i][1],
1604         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1605         $bind_attrs->[$i],
1606       );
1607     }
1608     else {
1609       $sth->bind_param(
1610         $i + 1,
1611         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1612           ? "$bind->[$i][1]"
1613           : $bind->[$i][1]
1614         ,
1615         $bind_attrs->[$i],
1616       );
1617     }
1618   }
1619
1620   # Can this fail without throwing an exception anyways???
1621   my $rv = $sth->execute();
1622   $self->throw_exception(
1623     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1624   ) if !$rv;
1625
1626   $self->_query_end( $sql, $bind );
1627
1628   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1629 }
1630
1631 sub _prefetch_autovalues {
1632   my ($self, $source, $to_insert) = @_;
1633
1634   my $colinfo = $source->columns_info;
1635
1636   my %values;
1637   for my $col (keys %$colinfo) {
1638     if (
1639       $colinfo->{$col}{auto_nextval}
1640         and
1641       (
1642         ! exists $to_insert->{$col}
1643           or
1644         ref $to_insert->{$col} eq 'SCALAR'
1645           or
1646         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1647       )
1648     ) {
1649       $values{$col} = $self->_sequence_fetch(
1650         'NEXTVAL',
1651         ( $colinfo->{$col}{sequence} ||=
1652             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1653         ),
1654       );
1655     }
1656   }
1657
1658   \%values;
1659 }
1660
1661 sub insert {
1662   my ($self, $source, $to_insert) = @_;
1663
1664   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1665
1666   # fuse the values, but keep a separate list of prefetched_values so that
1667   # they can be fused once again with the final return
1668   $to_insert = { %$to_insert, %$prefetched_values };
1669
1670   # FIXME - we seem to assume undef values as non-supplied. This is wrong.
1671   # Investigate what does it take to s/defined/exists/
1672   my $col_infos = $source->columns_info;
1673   my %pcols = map { $_ => 1 } $source->primary_columns;
1674   my (%retrieve_cols, $autoinc_supplied, $retrieve_autoinc_col);
1675   for my $col ($source->columns) {
1676     if ($col_infos->{$col}{is_auto_increment}) {
1677       $autoinc_supplied ||= 1 if defined $to_insert->{$col};
1678       $retrieve_autoinc_col ||= $col unless $autoinc_supplied;
1679     }
1680
1681     # nothing to retrieve when explicit values are supplied
1682     next if (defined $to_insert->{$col} and ! (
1683       ref $to_insert->{$col} eq 'SCALAR'
1684         or
1685       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1686     ));
1687
1688     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1689     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1690       $pcols{$col}
1691         or
1692       $col_infos->{$col}{retrieve_on_insert}
1693     );
1694   };
1695
1696   local $self->{_autoinc_supplied_for_op} = $autoinc_supplied;
1697   local $self->{_perform_autoinc_retrieval} = $retrieve_autoinc_col;
1698
1699   my ($sqla_opts, @ir_container);
1700   if (%retrieve_cols and $self->_use_insert_returning) {
1701     $sqla_opts->{returning_container} = \@ir_container
1702       if $self->_use_insert_returning_bound;
1703
1704     $sqla_opts->{returning} = [
1705       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1706     ];
1707   }
1708
1709   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1710
1711   my %returned_cols = %$to_insert;
1712   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1713     @ir_container = try {
1714       local $SIG{__WARN__} = sub {};
1715       my @r = $sth->fetchrow_array;
1716       $sth->finish;
1717       @r;
1718     } unless @ir_container;
1719
1720     @returned_cols{@$retlist} = @ir_container if @ir_container;
1721   }
1722   else {
1723     # pull in PK if needed and then everything else
1724     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1725
1726       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1727         unless $self->can('last_insert_id');
1728
1729       my @pri_values = $self->last_insert_id($source, @missing_pri);
1730
1731       $self->throw_exception( "Can't get last insert id" )
1732         unless (@pri_values == @missing_pri);
1733
1734       @returned_cols{@missing_pri} = @pri_values;
1735       delete $retrieve_cols{$_} for @missing_pri;
1736     }
1737
1738     # if there is more left to pull
1739     if (%retrieve_cols) {
1740       $self->throw_exception(
1741         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1742       ) unless %pcols;
1743
1744       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1745
1746       my $cur = DBIx::Class::ResultSet->new($source, {
1747         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1748         select => \@left_to_fetch,
1749       })->cursor;
1750
1751       @returned_cols{@left_to_fetch} = $cur->next;
1752
1753       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1754         if scalar $cur->next;
1755     }
1756   }
1757
1758   return { %$prefetched_values, %returned_cols };
1759 }
1760
1761 sub insert_bulk {
1762   my ($self, $source, $cols, $data) = @_;
1763
1764   my @col_range = (0..$#$cols);
1765
1766   # FIXME - perhaps this is not even needed? does DBI stringify?
1767   #
1768   # forcibly stringify whatever is stringifiable
1769   # ResultSet::populate() hands us a copy - safe to mangle
1770   for my $r (0 .. $#$data) {
1771     for my $c (0 .. $#{$data->[$r]}) {
1772       $data->[$r][$c] = "$data->[$r][$c]"
1773         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1774     }
1775   }
1776
1777   my $colinfos = $source->columns_info($cols);
1778
1779   local $self->{_autoinc_supplied_for_op} =
1780     (first { $_->{is_auto_increment} } values %$colinfos)
1781       ? 1
1782       : 0
1783   ;
1784
1785   # get a slice type index based on first row of data
1786   # a "column" in this context may refer to more than one bind value
1787   # e.g. \[ '?, ?', [...], [...] ]
1788   #
1789   # construct the value type index - a description of values types for every
1790   # per-column slice of $data:
1791   #
1792   # nonexistent - nonbind literal
1793   # 0 - regular value
1794   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
1795   #
1796   # also construct the column hash to pass to the SQL generator. For plain
1797   # (non literal) values - convert the members of the first row into a
1798   # literal+bind combo, with extra positional info in the bind attr hashref.
1799   # This will allow us to match the order properly, and is so contrived
1800   # because a user-supplied literal/bind (or something else specific to a
1801   # resultsource and/or storage driver) can inject extra binds along the
1802   # way, so one can't rely on "shift positions" ordering at all. Also we
1803   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
1804   # can be later matched up by address), because we want to supply a real
1805   # value on which perhaps e.g. datatype checks will be performed
1806   my ($proto_data, $value_type_idx);
1807   for my $i (@col_range) {
1808     my $colname = $cols->[$i];
1809     if (ref $data->[0][$i] eq 'SCALAR') {
1810       # no bind value at all - no type
1811
1812       $proto_data->{$colname} = $data->[0][$i];
1813     }
1814     elsif (ref $data->[0][$i] eq 'REF' and ref ${$data->[0][$i]} eq 'ARRAY' ) {
1815       # repack, so we don't end up mangling the original \[]
1816       my ($sql, @bind) = @${$data->[0][$i]};
1817
1818       # normalization of user supplied stuff
1819       my $resolved_bind = $self->_resolve_bindattrs(
1820         $source, \@bind, $colinfos,
1821       );
1822
1823       # store value-less (attrs only) bind info - we will be comparing all
1824       # supplied binds against this for sanity
1825       $value_type_idx->{$i} = [ map { $_->[0] } @$resolved_bind ];
1826
1827       $proto_data->{$colname} = \[ $sql, map { [
1828         # inject slice order to use for $proto_bind construction
1829           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $i }
1830             =>
1831           $resolved_bind->[$_][1]
1832         ] } (0 .. $#bind)
1833       ];
1834     }
1835     else {
1836       $value_type_idx->{$i} = 0;
1837
1838       $proto_data->{$colname} = \[ '?', [
1839         { dbic_colname => $colname, _bind_data_slice_idx => $i }
1840           =>
1841         $data->[0][$i]
1842       ] ];
1843     }
1844   }
1845
1846   my ($sql, $proto_bind) = $self->_prep_for_execute (
1847     'insert',
1848     $source,
1849     [ $proto_data ],
1850   );
1851
1852   if (! @$proto_bind and keys %$value_type_idx) {
1853     # if the bindlist is empty and we had some dynamic binds, this means the
1854     # storage ate them away (e.g. the NoBindVars component) and interpolated
1855     # them directly into the SQL. This obviously can't be good for multi-inserts
1856     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1857   }
1858
1859   # sanity checks
1860   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
1861   #
1862   # use an error reporting closure for convenience (less to pass)
1863   my $bad_slice_report_cref = sub {
1864     my ($msg, $r_idx, $c_idx) = @_;
1865     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1866       $msg,
1867       $cols->[$c_idx],
1868       do {
1869         require Data::Dumper::Concise;
1870         local $Data::Dumper::Maxdepth = 5;
1871         Data::Dumper::Concise::Dumper ({
1872           map { $cols->[$_] =>
1873             $data->[$r_idx][$_]
1874           } @col_range
1875         }),
1876       }
1877     );
1878   };
1879
1880   for my $col_idx (@col_range) {
1881     my $reference_val = $data->[0][$col_idx];
1882
1883     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
1884       my $val = $data->[$row_idx][$col_idx];
1885
1886       if (! exists $value_type_idx->{$col_idx}) { # literal no binds
1887         if (ref $val ne 'SCALAR') {
1888           $bad_slice_report_cref->(
1889             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
1890             $row_idx,
1891             $col_idx,
1892           );
1893         }
1894         elsif ($$val ne $$reference_val) {
1895           $bad_slice_report_cref->(
1896             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
1897             $row_idx,
1898             $col_idx,
1899           );
1900         }
1901       }
1902       elsif (! $value_type_idx->{$col_idx} ) {  # regular non-literal value
1903         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
1904           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
1905         }
1906       }
1907       else {  # binds from a \[], compare type and attrs
1908         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
1909           $bad_slice_report_cref->(
1910             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
1911             $row_idx,
1912             $col_idx,
1913           );
1914         }
1915         # start drilling down and bail out early on identical refs
1916         elsif (
1917           $reference_val != $val
1918             or
1919           $$reference_val != $$val
1920         ) {
1921           if (${$val}->[0] ne ${$reference_val}->[0]) {
1922             $bad_slice_report_cref->(
1923               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
1924               $row_idx,
1925               $col_idx,
1926             );
1927           }
1928           # need to check the bind attrs - a bind will happen only once for
1929           # the entire dataset, so any changes further down will be ignored.
1930           elsif (! Data::Compare::Compare(
1931             $value_type_idx->{$col_idx},
1932             [
1933               map
1934               { $_->[0] }
1935               @{$self->_resolve_bindattrs(
1936                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
1937               )}
1938             ],
1939           )) {
1940             $bad_slice_report_cref->(
1941               'Differing bind attributes on literal/bind values not supported',
1942               $row_idx,
1943               $col_idx,
1944             );
1945           }
1946         }
1947       }
1948     }
1949   }
1950
1951   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
1952   # are atomic (even if execute_for_fetch is a single call). Thus a safety
1953   # scope guard
1954   my $guard = $self->txn_scope_guard;
1955
1956   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
1957   my $sth = $self->_sth($sql);
1958   my $rv = do {
1959     if (@$proto_bind) {
1960       # proto bind contains the information on which pieces of $data to pull
1961       # $cols is passed in only for prettier error-reporting
1962       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
1963     }
1964     else {
1965       # bind_param_array doesn't work if there are no binds
1966       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1967     }
1968   };
1969
1970   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
1971
1972   $guard->commit;
1973
1974   return wantarray ? ($rv, $sth, @$proto_bind) : $rv;
1975 }
1976
1977 # execute_for_fetch is capable of returning data just fine (it means it
1978 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
1979 # is the void-populate fast-path we will just ignore this altogether
1980 # for the time being.
1981 sub _dbh_execute_for_fetch {
1982   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
1983
1984   my @idx_range = ( 0 .. $#$proto_bind );
1985
1986   # If we have any bind attributes to take care of, we will bind the
1987   # proto-bind data (which will never be used by execute_for_fetch)
1988   # However since column bindtypes are "sticky", this is sufficient
1989   # to get the DBD to apply the bindtype to all values later on
1990
1991   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
1992
1993   for my $i (@idx_range) {
1994     $sth->bind_param (
1995       $i+1, # DBI bind indexes are 1-based
1996       $proto_bind->[$i][1],
1997       $bind_attrs->[$i],
1998     ) if defined $bind_attrs->[$i];
1999   }
2000
2001   # At this point $data slots named in the _bind_data_slice_idx of
2002   # each piece of $proto_bind are either \[]s or plain values to be
2003   # passed in. Construct the dispensing coderef. *NOTE* the order
2004   # of $data will differ from this of the ?s in the SQL (due to
2005   # alphabetical ordering by colname). We actually do want to
2006   # preserve this behavior so that prepare_cached has a better
2007   # chance of matching on unrelated calls
2008   my %data_reorder = map { $proto_bind->[$_][0]{_bind_data_slice_idx} => $_ } @idx_range;
2009
2010   my $fetch_row_idx = -1; # saner loop this way
2011   my $fetch_tuple = sub {
2012     return undef if ++$fetch_row_idx > $#$data;
2013
2014     return [ map
2015       { (ref $_ eq 'REF' and ref $$_ eq 'ARRAY')
2016         ? map { $_->[-1] } @{$$_}[1 .. $#$$_]
2017         : $_
2018       }
2019       map
2020         { $data->[$fetch_row_idx][$_]}
2021         sort
2022           { $data_reorder{$a} <=> $data_reorder{$b} }
2023           keys %data_reorder
2024     ];
2025   };
2026
2027   my $tuple_status = [];
2028   my ($rv, $err);
2029   try {
2030     $rv = $sth->execute_for_fetch(
2031       $fetch_tuple,
2032       $tuple_status,
2033     );
2034   }
2035   catch {
2036     $err = shift;
2037   };
2038
2039   # Not all DBDs are create equal. Some throw on error, some return
2040   # an undef $rv, and some set $sth->err - try whatever we can
2041   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2042     ! defined $err
2043       and
2044     ( !defined $rv or $sth->err )
2045   );
2046
2047   # Statement must finish even if there was an exception.
2048   try {
2049     $sth->finish
2050   }
2051   catch {
2052     $err = shift unless defined $err
2053   };
2054
2055   if (defined $err) {
2056     my $i = 0;
2057     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2058
2059     $self->throw_exception("Unexpected populate error: $err")
2060       if ($i > $#$tuple_status);
2061
2062     require Data::Dumper::Concise;
2063     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2064       ($tuple_status->[$i][1] || $err),
2065       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2066     );
2067   }
2068
2069   return $rv;
2070 }
2071
2072 sub _dbh_execute_inserts_with_no_binds {
2073   my ($self, $sth, $count) = @_;
2074
2075   my $err;
2076   try {
2077     my $dbh = $self->_get_dbh;
2078     local $dbh->{RaiseError} = 1;
2079     local $dbh->{PrintError} = 0;
2080
2081     $sth->execute foreach 1..$count;
2082   }
2083   catch {
2084     $err = shift;
2085   };
2086
2087   # Make sure statement is finished even if there was an exception.
2088   try {
2089     $sth->finish
2090   }
2091   catch {
2092     $err = shift unless defined $err;
2093   };
2094
2095   $self->throw_exception($err) if defined $err;
2096
2097   return $count;
2098 }
2099
2100 sub update {
2101   #my ($self, $source, @args) = @_;
2102   shift->_execute('update', @_);
2103 }
2104
2105
2106 sub delete {
2107   #my ($self, $source, @args) = @_;
2108   shift->_execute('delete', @_);
2109 }
2110
2111 sub _select {
2112   my $self = shift;
2113   $self->_execute($self->_select_args(@_));
2114 }
2115
2116 sub _select_args_to_query {
2117   my $self = shift;
2118
2119   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2120   #  = $self->_select_args($ident, $select, $cond, $attrs);
2121   my ($op, $ident, @args) =
2122     $self->_select_args(@_);
2123
2124   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2125   my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, \@args);
2126   $prepared_bind ||= [];
2127
2128   return wantarray
2129     ? ($sql, $prepared_bind)
2130     : \[ "($sql)", @$prepared_bind ]
2131   ;
2132 }
2133
2134 sub _select_args {
2135   my ($self, $ident, $select, $where, $attrs) = @_;
2136
2137   my $sql_maker = $self->sql_maker;
2138   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2139
2140   $attrs = {
2141     %$attrs,
2142     select => $select,
2143     from => $ident,
2144     where => $where,
2145     $rs_alias && $alias2source->{$rs_alias}
2146       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2147       : ()
2148     ,
2149   };
2150
2151   # Sanity check the attributes (SQLMaker does it too, but
2152   # in case of a software_limit we'll never reach there)
2153   if (defined $attrs->{offset}) {
2154     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2155       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2156   }
2157
2158   if (defined $attrs->{rows}) {
2159     $self->throw_exception("The rows attribute must be a positive integer if present")
2160       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2161   }
2162   elsif ($attrs->{offset}) {
2163     # MySQL actually recommends this approach.  I cringe.
2164     $attrs->{rows} = $sql_maker->__max_int;
2165   }
2166
2167   my @limit;
2168
2169   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2170   # storage, unless software limit was requested
2171   if (
2172     #limited has_many
2173     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2174        ||
2175     # grouped prefetch (to satisfy group_by == select)
2176     ( $attrs->{group_by}
2177         &&
2178       @{$attrs->{group_by}}
2179         &&
2180       $attrs->{_prefetch_selector_range}
2181     )
2182   ) {
2183     ($ident, $select, $where, $attrs)
2184       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2185   }
2186   elsif (! $attrs->{software_limit} ) {
2187     push @limit, (
2188       $attrs->{rows} || (),
2189       $attrs->{offset} || (),
2190     );
2191   }
2192
2193   # try to simplify the joinmap further (prune unreferenced type-single joins)
2194   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2195
2196 ###
2197   # This would be the point to deflate anything found in $where
2198   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2199   # expect a row object. And all we have is a resultsource (it is trivial
2200   # to extract deflator coderefs via $alias2source above).
2201   #
2202   # I don't see a way forward other than changing the way deflators are
2203   # invoked, and that's just bad...
2204 ###
2205
2206   return ('select', $ident, $select, $where, $attrs, @limit);
2207 }
2208
2209 # Returns a counting SELECT for a simple count
2210 # query. Abstracted so that a storage could override
2211 # this to { count => 'firstcol' } or whatever makes
2212 # sense as a performance optimization
2213 sub _count_select {
2214   #my ($self, $source, $rs_attrs) = @_;
2215   return { count => '*' };
2216 }
2217
2218 sub source_bind_attributes {
2219   shift->throw_exception(
2220     'source_bind_attributes() was never meant to be a callable public method - '
2221    .'please contact the DBIC dev-team and describe your use case so that a reasonable '
2222    .'solution can be provided'
2223    ."\nhttp://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT"
2224   );
2225 }
2226
2227 =head2 select
2228
2229 =over 4
2230
2231 =item Arguments: $ident, $select, $condition, $attrs
2232
2233 =back
2234
2235 Handle a SQL select statement.
2236
2237 =cut
2238
2239 sub select {
2240   my $self = shift;
2241   my ($ident, $select, $condition, $attrs) = @_;
2242   return $self->cursor_class->new($self, \@_, $attrs);
2243 }
2244
2245 sub select_single {
2246   my $self = shift;
2247   my ($rv, $sth, @bind) = $self->_select(@_);
2248   my @row = $sth->fetchrow_array;
2249   my @nextrow = $sth->fetchrow_array if @row;
2250   if(@row && @nextrow) {
2251     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2252   }
2253   # Need to call finish() to work round broken DBDs
2254   $sth->finish();
2255   return @row;
2256 }
2257
2258 =head2 sql_limit_dialect
2259
2260 This is an accessor for the default SQL limit dialect used by a particular
2261 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2262 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2263 see L<DBIx::Class::SQLMaker::LimitDialects>.
2264
2265 =cut
2266
2267 sub _dbh_sth {
2268   my ($self, $dbh, $sql) = @_;
2269
2270   # 3 is the if_active parameter which avoids active sth re-use
2271   my $sth = $self->disable_sth_caching
2272     ? $dbh->prepare($sql)
2273     : $dbh->prepare_cached($sql, {}, 3);
2274
2275   # XXX You would think RaiseError would make this impossible,
2276   #  but apparently that's not true :(
2277   $self->throw_exception(
2278     $dbh->errstr
2279       ||
2280     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2281             .'an exception and/or setting $dbh->errstr',
2282       length ($sql) > 20
2283         ? substr($sql, 0, 20) . '...'
2284         : $sql
2285       ,
2286       'DBD::' . $dbh->{Driver}{Name},
2287     )
2288   ) if !$sth;
2289
2290   $sth;
2291 }
2292
2293 sub sth {
2294   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2295   shift->_sth(@_);
2296 }
2297
2298 sub _sth {
2299   my ($self, $sql) = @_;
2300   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2301 }
2302
2303 sub _dbh_columns_info_for {
2304   my ($self, $dbh, $table) = @_;
2305
2306   if ($dbh->can('column_info')) {
2307     my %result;
2308     my $caught;
2309     try {
2310       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2311       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2312       $sth->execute();
2313       while ( my $info = $sth->fetchrow_hashref() ){
2314         my %column_info;
2315         $column_info{data_type}   = $info->{TYPE_NAME};
2316         $column_info{size}      = $info->{COLUMN_SIZE};
2317         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2318         $column_info{default_value} = $info->{COLUMN_DEF};
2319         my $col_name = $info->{COLUMN_NAME};
2320         $col_name =~ s/^\"(.*)\"$/$1/;
2321
2322         $result{$col_name} = \%column_info;
2323       }
2324     } catch {
2325       $caught = 1;
2326     };
2327     return \%result if !$caught && scalar keys %result;
2328   }
2329
2330   my %result;
2331   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2332   $sth->execute;
2333   my @columns = @{$sth->{NAME_lc}};
2334   for my $i ( 0 .. $#columns ){
2335     my %column_info;
2336     $column_info{data_type} = $sth->{TYPE}->[$i];
2337     $column_info{size} = $sth->{PRECISION}->[$i];
2338     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2339
2340     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2341       $column_info{data_type} = $1;
2342       $column_info{size}    = $2;
2343     }
2344
2345     $result{$columns[$i]} = \%column_info;
2346   }
2347   $sth->finish;
2348
2349   foreach my $col (keys %result) {
2350     my $colinfo = $result{$col};
2351     my $type_num = $colinfo->{data_type};
2352     my $type_name;
2353     if(defined $type_num && $dbh->can('type_info')) {
2354       my $type_info = $dbh->type_info($type_num);
2355       $type_name = $type_info->{TYPE_NAME} if $type_info;
2356       $colinfo->{data_type} = $type_name if $type_name;
2357     }
2358   }
2359
2360   return \%result;
2361 }
2362
2363 sub columns_info_for {
2364   my ($self, $table) = @_;
2365   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2366 }
2367
2368 =head2 last_insert_id
2369
2370 Return the row id of the last insert.
2371
2372 =cut
2373
2374 sub _dbh_last_insert_id {
2375     my ($self, $dbh, $source, $col) = @_;
2376
2377     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2378
2379     return $id if defined $id;
2380
2381     my $class = ref $self;
2382     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2383 }
2384
2385 sub last_insert_id {
2386   my $self = shift;
2387   $self->_dbh_last_insert_id ($self->_dbh, @_);
2388 }
2389
2390 =head2 _native_data_type
2391
2392 =over 4
2393
2394 =item Arguments: $type_name
2395
2396 =back
2397
2398 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2399 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2400 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2401
2402 The default implementation returns C<undef>, implement in your Storage driver if
2403 you need this functionality.
2404
2405 Should map types from other databases to the native RDBMS type, for example
2406 C<VARCHAR2> to C<VARCHAR>.
2407
2408 Types with modifiers should map to the underlying data type. For example,
2409 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2410
2411 Composite types should map to the container type, for example
2412 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2413
2414 =cut
2415
2416 sub _native_data_type {
2417   #my ($self, $data_type) = @_;
2418   return undef
2419 }
2420
2421 # Check if placeholders are supported at all
2422 sub _determine_supports_placeholders {
2423   my $self = shift;
2424   my $dbh  = $self->_get_dbh;
2425
2426   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2427   # but it is inaccurate more often than not
2428   return try {
2429     local $dbh->{PrintError} = 0;
2430     local $dbh->{RaiseError} = 1;
2431     $dbh->do('select ?', {}, 1);
2432     1;
2433   }
2434   catch {
2435     0;
2436   };
2437 }
2438
2439 # Check if placeholders bound to non-string types throw exceptions
2440 #
2441 sub _determine_supports_typeless_placeholders {
2442   my $self = shift;
2443   my $dbh  = $self->_get_dbh;
2444
2445   return try {
2446     local $dbh->{PrintError} = 0;
2447     local $dbh->{RaiseError} = 1;
2448     # this specifically tests a bind that is NOT a string
2449     $dbh->do('select 1 where 1 = ?', {}, 1);
2450     1;
2451   }
2452   catch {
2453     0;
2454   };
2455 }
2456
2457 =head2 sqlt_type
2458
2459 Returns the database driver name.
2460
2461 =cut
2462
2463 sub sqlt_type {
2464   shift->_get_dbh->{Driver}->{Name};
2465 }
2466
2467 =head2 bind_attribute_by_data_type
2468
2469 Given a datatype from column info, returns a database specific bind
2470 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2471 let the database planner just handle it.
2472
2473 Generally only needed for special case column types, like bytea in postgres.
2474
2475 =cut
2476
2477 sub bind_attribute_by_data_type {
2478     return;
2479 }
2480
2481 =head2 is_datatype_numeric
2482
2483 Given a datatype from column_info, returns a boolean value indicating if
2484 the current RDBMS considers it a numeric value. This controls how
2485 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2486 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2487 be performed instead of the usual C<eq>.
2488
2489 =cut
2490
2491 sub is_datatype_numeric {
2492   #my ($self, $dt) = @_;
2493
2494   return 0 unless $_[1];
2495
2496   $_[1] =~ /^ (?:
2497     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2498   ) $/ix;
2499 }
2500
2501
2502 =head2 create_ddl_dir
2503
2504 =over 4
2505
2506 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2507
2508 =back
2509
2510 Creates a SQL file based on the Schema, for each of the specified
2511 database engines in C<\@databases> in the given directory.
2512 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2513
2514 Given a previous version number, this will also create a file containing
2515 the ALTER TABLE statements to transform the previous schema into the
2516 current one. Note that these statements may contain C<DROP TABLE> or
2517 C<DROP COLUMN> statements that can potentially destroy data.
2518
2519 The file names are created using the C<ddl_filename> method below, please
2520 override this method in your schema if you would like a different file
2521 name format. For the ALTER file, the same format is used, replacing
2522 $version in the name with "$preversion-$version".
2523
2524 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2525 The most common value for this would be C<< { add_drop_table => 1 } >>
2526 to have the SQL produced include a C<DROP TABLE> statement for each table
2527 created. For quoting purposes supply C<quote_table_names> and
2528 C<quote_field_names>.
2529
2530 If no arguments are passed, then the following default values are assumed:
2531
2532 =over 4
2533
2534 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2535
2536 =item version    - $schema->schema_version
2537
2538 =item directory  - './'
2539
2540 =item preversion - <none>
2541
2542 =back
2543
2544 By default, C<\%sqlt_args> will have
2545
2546  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2547
2548 merged with the hash passed in. To disable any of those features, pass in a
2549 hashref like the following
2550
2551  { ignore_constraint_names => 0, # ... other options }
2552
2553
2554 WARNING: You are strongly advised to check all SQL files created, before applying
2555 them.
2556
2557 =cut
2558
2559 sub create_ddl_dir {
2560   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2561
2562   unless ($dir) {
2563     carp "No directory given, using ./\n";
2564     $dir = './';
2565   } else {
2566       -d $dir
2567         or
2568       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2569         or
2570       $self->throw_exception(
2571         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2572       );
2573   }
2574
2575   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2576
2577   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2578   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2579
2580   my $schema_version = $schema->schema_version || '1.x';
2581   $version ||= $schema_version;
2582
2583   $sqltargs = {
2584     add_drop_table => 1,
2585     ignore_constraint_names => 1,
2586     ignore_index_names => 1,
2587     %{$sqltargs || {}}
2588   };
2589
2590   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2591     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2592   }
2593
2594   my $sqlt = SQL::Translator->new( $sqltargs );
2595
2596   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2597   my $sqlt_schema = $sqlt->translate({ data => $schema })
2598     or $self->throw_exception ($sqlt->error);
2599
2600   foreach my $db (@$databases) {
2601     $sqlt->reset();
2602     $sqlt->{schema} = $sqlt_schema;
2603     $sqlt->producer($db);
2604
2605     my $file;
2606     my $filename = $schema->ddl_filename($db, $version, $dir);
2607     if (-e $filename && ($version eq $schema_version )) {
2608       # if we are dumping the current version, overwrite the DDL
2609       carp "Overwriting existing DDL file - $filename";
2610       unlink($filename);
2611     }
2612
2613     my $output = $sqlt->translate;
2614     if(!$output) {
2615       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2616       next;
2617     }
2618     if(!open($file, ">$filename")) {
2619       $self->throw_exception("Can't open $filename for writing ($!)");
2620       next;
2621     }
2622     print $file $output;
2623     close($file);
2624
2625     next unless ($preversion);
2626
2627     require SQL::Translator::Diff;
2628
2629     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2630     if(!-e $prefilename) {
2631       carp("No previous schema file found ($prefilename)");
2632       next;
2633     }
2634
2635     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2636     if(-e $difffile) {
2637       carp("Overwriting existing diff file - $difffile");
2638       unlink($difffile);
2639     }
2640
2641     my $source_schema;
2642     {
2643       my $t = SQL::Translator->new($sqltargs);
2644       $t->debug( 0 );
2645       $t->trace( 0 );
2646
2647       $t->parser( $db )
2648         or $self->throw_exception ($t->error);
2649
2650       my $out = $t->translate( $prefilename )
2651         or $self->throw_exception ($t->error);
2652
2653       $source_schema = $t->schema;
2654
2655       $source_schema->name( $prefilename )
2656         unless ( $source_schema->name );
2657     }
2658
2659     # The "new" style of producers have sane normalization and can support
2660     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2661     # And we have to diff parsed SQL against parsed SQL.
2662     my $dest_schema = $sqlt_schema;
2663
2664     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2665       my $t = SQL::Translator->new($sqltargs);
2666       $t->debug( 0 );
2667       $t->trace( 0 );
2668
2669       $t->parser( $db )
2670         or $self->throw_exception ($t->error);
2671
2672       my $out = $t->translate( $filename )
2673         or $self->throw_exception ($t->error);
2674
2675       $dest_schema = $t->schema;
2676
2677       $dest_schema->name( $filename )
2678         unless $dest_schema->name;
2679     }
2680
2681     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2682                                                   $dest_schema,   $db,
2683                                                   $sqltargs
2684                                                  );
2685     if(!open $file, ">$difffile") {
2686       $self->throw_exception("Can't write to $difffile ($!)");
2687       next;
2688     }
2689     print $file $diff;
2690     close($file);
2691   }
2692 }
2693
2694 =head2 deployment_statements
2695
2696 =over 4
2697
2698 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2699
2700 =back
2701
2702 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2703
2704 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2705 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2706
2707 C<$directory> is used to return statements from files in a previously created
2708 L</create_ddl_dir> directory and is optional. The filenames are constructed
2709 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2710
2711 If no C<$directory> is specified then the statements are constructed on the
2712 fly using L<SQL::Translator> and C<$version> is ignored.
2713
2714 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2715
2716 =cut
2717
2718 sub deployment_statements {
2719   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2720   $type ||= $self->sqlt_type;
2721   $version ||= $schema->schema_version || '1.x';
2722   $dir ||= './';
2723   my $filename = $schema->ddl_filename($type, $version, $dir);
2724   if(-f $filename)
2725   {
2726       # FIXME replace this block when a proper sane sql parser is available
2727       my $file;
2728       open($file, "<$filename")
2729         or $self->throw_exception("Can't open $filename ($!)");
2730       my @rows = <$file>;
2731       close($file);
2732       return join('', @rows);
2733   }
2734
2735   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2736     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2737   }
2738
2739   # sources needs to be a parser arg, but for simplicty allow at top level
2740   # coming in
2741   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2742       if exists $sqltargs->{sources};
2743
2744   my $tr = SQL::Translator->new(
2745     producer => "SQL::Translator::Producer::${type}",
2746     %$sqltargs,
2747     parser => 'SQL::Translator::Parser::DBIx::Class',
2748     data => $schema,
2749   );
2750
2751   my @ret;
2752   if (wantarray) {
2753     @ret = $tr->translate;
2754   }
2755   else {
2756     $ret[0] = $tr->translate;
2757   }
2758
2759   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2760     unless (@ret && defined $ret[0]);
2761
2762   return wantarray ? @ret : $ret[0];
2763 }
2764
2765 # FIXME deploy() currently does not accurately report sql errors
2766 # Will always return true while errors are warned
2767 sub deploy {
2768   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2769   my $deploy = sub {
2770     my $line = shift;
2771     return if(!$line);
2772     return if($line =~ /^--/);
2773     # next if($line =~ /^DROP/m);
2774     return if($line =~ /^BEGIN TRANSACTION/m);
2775     return if($line =~ /^COMMIT/m);
2776     return if $line =~ /^\s+$/; # skip whitespace only
2777     $self->_query_start($line);
2778     try {
2779       # do a dbh_do cycle here, as we need some error checking in
2780       # place (even though we will ignore errors)
2781       $self->dbh_do (sub { $_[1]->do($line) });
2782     } catch {
2783       carp qq{$_ (running "${line}")};
2784     };
2785     $self->_query_end($line);
2786   };
2787   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2788   if (@statements > 1) {
2789     foreach my $statement (@statements) {
2790       $deploy->( $statement );
2791     }
2792   }
2793   elsif (@statements == 1) {
2794     # split on single line comments and end of statements
2795     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2796       $deploy->( $line );
2797     }
2798   }
2799 }
2800
2801 =head2 datetime_parser
2802
2803 Returns the datetime parser class
2804
2805 =cut
2806
2807 sub datetime_parser {
2808   my $self = shift;
2809   return $self->{datetime_parser} ||= do {
2810     $self->build_datetime_parser(@_);
2811   };
2812 }
2813
2814 =head2 datetime_parser_type
2815
2816 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2817
2818 =head2 build_datetime_parser
2819
2820 See L</datetime_parser>
2821
2822 =cut
2823
2824 sub build_datetime_parser {
2825   my $self = shift;
2826   my $type = $self->datetime_parser_type(@_);
2827   return $type;
2828 }
2829
2830
2831 =head2 is_replicating
2832
2833 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2834 replicate from a master database.  Default is undef, which is the result
2835 returned by databases that don't support replication.
2836
2837 =cut
2838
2839 sub is_replicating {
2840     return;
2841
2842 }
2843
2844 =head2 lag_behind_master
2845
2846 Returns a number that represents a certain amount of lag behind a master db
2847 when a given storage is replicating.  The number is database dependent, but
2848 starts at zero and increases with the amount of lag. Default in undef
2849
2850 =cut
2851
2852 sub lag_behind_master {
2853     return;
2854 }
2855
2856 =head2 relname_to_table_alias
2857
2858 =over 4
2859
2860 =item Arguments: $relname, $join_count
2861
2862 =back
2863
2864 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2865 queries.
2866
2867 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2868 way these aliases are named.
2869
2870 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2871 otherwise C<"$relname">.
2872
2873 =cut
2874
2875 sub relname_to_table_alias {
2876   my ($self, $relname, $join_count) = @_;
2877
2878   my $alias = ($join_count && $join_count > 1 ?
2879     join('_', $relname, $join_count) : $relname);
2880
2881   return $alias;
2882 }
2883
2884 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2885 # version and it may be necessary to amend or override it for a specific storage
2886 # if such binds are necessary.
2887 sub _max_column_bytesize {
2888   my ($self, $attr) = @_;
2889
2890   my $max_size;
2891
2892   if ($attr->{sqlt_datatype}) {
2893     my $data_type = lc($attr->{sqlt_datatype});
2894
2895     if ($attr->{sqlt_size}) {
2896
2897       # String/sized-binary types
2898       if ($data_type =~ /^(?:
2899           l? (?:var)? char(?:acter)? (?:\s*varying)?
2900             |
2901           (?:var)? binary (?:\s*varying)?
2902             |
2903           raw
2904         )\b/x
2905       ) {
2906         $max_size = $attr->{sqlt_size};
2907       }
2908       # Other charset/unicode types, assume scale of 4
2909       elsif ($data_type =~ /^(?:
2910           national \s* character (?:\s*varying)?
2911             |
2912           nchar
2913             |
2914           univarchar
2915             |
2916           nvarchar
2917         )\b/x
2918       ) {
2919         $max_size = $attr->{sqlt_size} * 4;
2920       }
2921     }
2922
2923     if (!$max_size and !$self->_is_lob_type($data_type)) {
2924       $max_size = 100 # for all other (numeric?) datatypes
2925     }
2926   }
2927
2928   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
2929 }
2930
2931 # Determine if a data_type is some type of BLOB
2932 sub _is_lob_type {
2933   my ($self, $data_type) = @_;
2934   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2935     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2936                                   |varchar|character\s*varying|nvarchar
2937                                   |national\s*character\s*varying))?\z/xi);
2938 }
2939
2940 sub _is_binary_lob_type {
2941   my ($self, $data_type) = @_;
2942   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2943     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2944 }
2945
2946 sub _is_text_lob_type {
2947   my ($self, $data_type) = @_;
2948   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2949     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2950                         |national\s*character\s*varying))\z/xi);
2951 }
2952
2953 1;
2954
2955 =head1 USAGE NOTES
2956
2957 =head2 DBIx::Class and AutoCommit
2958
2959 DBIx::Class can do some wonderful magic with handling exceptions,
2960 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2961 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
2962 transaction support.
2963
2964 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2965 in an assumed transaction between commits, and you're telling us you'd
2966 like to manage that manually.  A lot of the magic protections offered by
2967 this module will go away.  We can't protect you from exceptions due to database
2968 disconnects because we don't know anything about how to restart your
2969 transactions.  You're on your own for handling all sorts of exceptional
2970 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2971 be with raw DBI.
2972
2973
2974 =head1 AUTHORS
2975
2976 Matt S. Trout <mst@shadowcatsystems.co.uk>
2977
2978 Andy Grundman <andy@hybridized.org>
2979
2980 =head1 LICENSE
2981
2982 You may distribute this code under the same terms as Perl itself.
2983
2984 =cut