e9fab0b222f1d827c4db47b5563e04c24edaa1d9
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Try::Tiny;
16 use overload ();
17 use Data::Compare (); # no imports!!! guard against insane architecture
18 use namespace::clean;
19
20 # default cursor class, overridable in connect_info attributes
21 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
22
23 __PACKAGE__->mk_group_accessors('inherited' => qw/
24   sql_limit_dialect sql_quote_char sql_name_sep
25 /);
26
27 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
28
29 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
30 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
31
32 __PACKAGE__->sql_name_sep('.');
33
34 __PACKAGE__->mk_group_accessors('simple' => qw/
35   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
36   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
37 /);
38
39 # the values for these accessors are picked out (and deleted) from
40 # the attribute hashref passed to connect_info
41 my @storage_options = qw/
42   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
43   disable_sth_caching unsafe auto_savepoint
44 /;
45 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
46
47
48 # capability definitions, using a 2-tiered accessor system
49 # The rationale is:
50 #
51 # A driver/user may define _use_X, which blindly without any checks says:
52 # "(do not) use this capability", (use_dbms_capability is an "inherited"
53 # type accessor)
54 #
55 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
56 # accessor, which in turn calls _determine_supports_X, and stores the return
57 # in a special slot on the storage object, which is wiped every time a $dbh
58 # reconnection takes place (it is not guaranteed that upon reconnection we
59 # will get the same rdbms version). _determine_supports_X does not need to
60 # exist on a driver, as we ->can for it before calling.
61
62 my @capabilities = (qw/
63   insert_returning
64   insert_returning_bound
65   placeholders
66   typeless_placeholders
67   join_optimizer
68 /);
69 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
70 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
71
72 # on by default, not strictly a capability (pending rewrite)
73 __PACKAGE__->_use_join_optimizer (1);
74 sub _determine_supports_join_optimizer { 1 };
75
76 # Each of these methods need _determine_driver called before itself
77 # in order to function reliably. This is a purely DRY optimization
78 #
79 # get_(use)_dbms_capability need to be called on the correct Storage
80 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
81 # _determine_supports_X which obv. needs a correct driver as well
82 my @rdbms_specific_methods = qw/
83   deployment_statements
84   sqlt_type
85   sql_maker
86   build_datetime_parser
87   datetime_parser_type
88
89   txn_begin
90   insert
91   insert_bulk
92   update
93   delete
94   select
95   select_single
96   with_deferred_fk_checks
97
98   get_use_dbms_capability
99   get_dbms_capability
100
101   _server_info
102   _get_server_version
103 /;
104
105 for my $meth (@rdbms_specific_methods) {
106
107   my $orig = __PACKAGE__->can ($meth)
108     or die "$meth is not a ::Storage::DBI method!";
109
110   no strict qw/refs/;
111   no warnings qw/redefine/;
112   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
113     if (
114       # only fire when invoked on an instance, a valid class-based invocation
115       # would e.g. be setting a default for an inherited accessor
116       ref $_[0]
117         and
118       ! $_[0]->_driver_determined
119         and
120       ! $_[0]->{_in_determine_driver}
121     ) {
122       $_[0]->_determine_driver;
123
124       # This for some reason crashes and burns on perl 5.8.1
125       # IFF the method ends up throwing an exception
126       #goto $_[0]->can ($meth);
127
128       my $cref = $_[0]->can ($meth);
129       goto $cref;
130     }
131
132     goto $orig;
133   };
134 }
135
136 =head1 NAME
137
138 DBIx::Class::Storage::DBI - DBI storage handler
139
140 =head1 SYNOPSIS
141
142   my $schema = MySchema->connect('dbi:SQLite:my.db');
143
144   $schema->storage->debug(1);
145
146   my @stuff = $schema->storage->dbh_do(
147     sub {
148       my ($storage, $dbh, @args) = @_;
149       $dbh->do("DROP TABLE authors");
150     },
151     @column_list
152   );
153
154   $schema->resultset('Book')->search({
155      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
156   });
157
158 =head1 DESCRIPTION
159
160 This class represents the connection to an RDBMS via L<DBI>.  See
161 L<DBIx::Class::Storage> for general information.  This pod only
162 documents DBI-specific methods and behaviors.
163
164 =head1 METHODS
165
166 =cut
167
168 sub new {
169   my $new = shift->next::method(@_);
170
171   $new->_sql_maker_opts({});
172   $new->_dbh_details({});
173   $new->{_in_do_block} = 0;
174   $new->{_dbh_gen} = 0;
175
176   # read below to see what this does
177   $new->_arm_global_destructor;
178
179   $new;
180 }
181
182 # This is hack to work around perl shooting stuff in random
183 # order on exit(). If we do not walk the remaining storage
184 # objects in an END block, there is a *small but real* chance
185 # of a fork()ed child to kill the parent's shared DBI handle,
186 # *before perl reaches the DESTROY in this package*
187 # Yes, it is ugly and effective.
188 # Additionally this registry is used by the CLONE method to
189 # make sure no handles are shared between threads
190 {
191   my %seek_and_destroy;
192
193   sub _arm_global_destructor {
194     my $self = shift;
195     my $key = refaddr ($self);
196     $seek_and_destroy{$key} = $self;
197     weaken ($seek_and_destroy{$key});
198   }
199
200   END {
201     local $?; # just in case the DBI destructor changes it somehow
202
203     # destroy just the object if not native to this process/thread
204     $_->_verify_pid for (grep
205       { defined $_ }
206       values %seek_and_destroy
207     );
208   }
209
210   sub CLONE {
211     # As per DBI's recommendation, DBIC disconnects all handles as
212     # soon as possible (DBIC will reconnect only on demand from within
213     # the thread)
214     for (values %seek_and_destroy) {
215       next unless $_;
216       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
217       $_->_dbh(undef);
218
219       $_->transaction_depth(0);
220       $_->savepoints([]);
221     }
222   }
223 }
224
225 sub DESTROY {
226   my $self = shift;
227
228   # some databases spew warnings on implicit disconnect
229   local $SIG{__WARN__} = sub {};
230   $self->_dbh(undef);
231
232   # this op is necessary, since the very last perl runtime statement
233   # triggers a global destruction shootout, and the $SIG localization
234   # may very well be destroyed before perl actually gets to do the
235   # $dbh undef
236   1;
237 }
238
239 # handle pid changes correctly - do not destroy parent's connection
240 sub _verify_pid {
241   my $self = shift;
242
243   my $pid = $self->_conn_pid;
244   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
245     $dbh->{InactiveDestroy} = 1;
246     $self->{_dbh_gen}++;
247     $self->_dbh(undef);
248     $self->transaction_depth(0);
249     $self->savepoints([]);
250   }
251
252   return;
253 }
254
255 =head2 connect_info
256
257 This method is normally called by L<DBIx::Class::Schema/connection>, which
258 encapsulates its argument list in an arrayref before passing them here.
259
260 The argument list may contain:
261
262 =over
263
264 =item *
265
266 The same 4-element argument set one would normally pass to
267 L<DBI/connect>, optionally followed by
268 L<extra attributes|/DBIx::Class specific connection attributes>
269 recognized by DBIx::Class:
270
271   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
272
273 =item *
274
275 A single code reference which returns a connected
276 L<DBI database handle|DBI/connect> optionally followed by
277 L<extra attributes|/DBIx::Class specific connection attributes> recognized
278 by DBIx::Class:
279
280   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
281
282 =item *
283
284 A single hashref with all the attributes and the dsn/user/password
285 mixed together:
286
287   $connect_info_args = [{
288     dsn => $dsn,
289     user => $user,
290     password => $pass,
291     %dbi_attributes,
292     %extra_attributes,
293   }];
294
295   $connect_info_args = [{
296     dbh_maker => sub { DBI->connect (...) },
297     %dbi_attributes,
298     %extra_attributes,
299   }];
300
301 This is particularly useful for L<Catalyst> based applications, allowing the
302 following config (L<Config::General> style):
303
304   <Model::DB>
305     schema_class   App::DB
306     <connect_info>
307       dsn          dbi:mysql:database=test
308       user         testuser
309       password     TestPass
310       AutoCommit   1
311     </connect_info>
312   </Model::DB>
313
314 The C<dsn>/C<user>/C<password> combination can be substituted by the
315 C<dbh_maker> key whose value is a coderef that returns a connected
316 L<DBI database handle|DBI/connect>
317
318 =back
319
320 Please note that the L<DBI> docs recommend that you always explicitly
321 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
322 recommends that it be set to I<1>, and that you perform transactions
323 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
324 to I<1> if you do not do explicitly set it to zero.  This is the default
325 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
326
327 =head3 DBIx::Class specific connection attributes
328
329 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
330 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
331 the following connection options. These options can be mixed in with your other
332 L<DBI> connection attributes, or placed in a separate hashref
333 (C<\%extra_attributes>) as shown above.
334
335 Every time C<connect_info> is invoked, any previous settings for
336 these options will be cleared before setting the new ones, regardless of
337 whether any options are specified in the new C<connect_info>.
338
339
340 =over
341
342 =item on_connect_do
343
344 Specifies things to do immediately after connecting or re-connecting to
345 the database.  Its value may contain:
346
347 =over
348
349 =item a scalar
350
351 This contains one SQL statement to execute.
352
353 =item an array reference
354
355 This contains SQL statements to execute in order.  Each element contains
356 a string or a code reference that returns a string.
357
358 =item a code reference
359
360 This contains some code to execute.  Unlike code references within an
361 array reference, its return value is ignored.
362
363 =back
364
365 =item on_disconnect_do
366
367 Takes arguments in the same form as L</on_connect_do> and executes them
368 immediately before disconnecting from the database.
369
370 Note, this only runs if you explicitly call L</disconnect> on the
371 storage object.
372
373 =item on_connect_call
374
375 A more generalized form of L</on_connect_do> that calls the specified
376 C<connect_call_METHOD> methods in your storage driver.
377
378   on_connect_do => 'select 1'
379
380 is equivalent to:
381
382   on_connect_call => [ [ do_sql => 'select 1' ] ]
383
384 Its values may contain:
385
386 =over
387
388 =item a scalar
389
390 Will call the C<connect_call_METHOD> method.
391
392 =item a code reference
393
394 Will execute C<< $code->($storage) >>
395
396 =item an array reference
397
398 Each value can be a method name or code reference.
399
400 =item an array of arrays
401
402 For each array, the first item is taken to be the C<connect_call_> method name
403 or code reference, and the rest are parameters to it.
404
405 =back
406
407 Some predefined storage methods you may use:
408
409 =over
410
411 =item do_sql
412
413 Executes a SQL string or a code reference that returns a SQL string. This is
414 what L</on_connect_do> and L</on_disconnect_do> use.
415
416 It can take:
417
418 =over
419
420 =item a scalar
421
422 Will execute the scalar as SQL.
423
424 =item an arrayref
425
426 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
427 attributes hashref and bind values.
428
429 =item a code reference
430
431 Will execute C<< $code->($storage) >> and execute the return array refs as
432 above.
433
434 =back
435
436 =item datetime_setup
437
438 Execute any statements necessary to initialize the database session to return
439 and accept datetime/timestamp values used with
440 L<DBIx::Class::InflateColumn::DateTime>.
441
442 Only necessary for some databases, see your specific storage driver for
443 implementation details.
444
445 =back
446
447 =item on_disconnect_call
448
449 Takes arguments in the same form as L</on_connect_call> and executes them
450 immediately before disconnecting from the database.
451
452 Calls the C<disconnect_call_METHOD> methods as opposed to the
453 C<connect_call_METHOD> methods called by L</on_connect_call>.
454
455 Note, this only runs if you explicitly call L</disconnect> on the
456 storage object.
457
458 =item disable_sth_caching
459
460 If set to a true value, this option will disable the caching of
461 statement handles via L<DBI/prepare_cached>.
462
463 =item limit_dialect
464
465 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
466 default L</sql_limit_dialect> setting of the storage (if any). For a list
467 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
468
469 =item quote_names
470
471 When true automatically sets L</quote_char> and L</name_sep> to the characters
472 appropriate for your particular RDBMS. This option is preferred over specifying
473 L</quote_char> directly.
474
475 =item quote_char
476
477 Specifies what characters to use to quote table and column names.
478
479 C<quote_char> expects either a single character, in which case is it
480 is placed on either side of the table/column name, or an arrayref of length
481 2 in which case the table/column name is placed between the elements.
482
483 For example under MySQL you should use C<< quote_char => '`' >>, and for
484 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
485
486 =item name_sep
487
488 This parameter is only useful in conjunction with C<quote_char>, and is used to
489 specify the character that separates elements (schemas, tables, columns) from
490 each other. If unspecified it defaults to the most commonly used C<.>.
491
492 =item unsafe
493
494 This Storage driver normally installs its own C<HandleError>, sets
495 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
496 all database handles, including those supplied by a coderef.  It does this
497 so that it can have consistent and useful error behavior.
498
499 If you set this option to a true value, Storage will not do its usual
500 modifications to the database handle's attributes, and instead relies on
501 the settings in your connect_info DBI options (or the values you set in
502 your connection coderef, in the case that you are connecting via coderef).
503
504 Note that your custom settings can cause Storage to malfunction,
505 especially if you set a C<HandleError> handler that suppresses exceptions
506 and/or disable C<RaiseError>.
507
508 =item auto_savepoint
509
510 If this option is true, L<DBIx::Class> will use savepoints when nesting
511 transactions, making it possible to recover from failure in the inner
512 transaction without having to abort all outer transactions.
513
514 =item cursor_class
515
516 Use this argument to supply a cursor class other than the default
517 L<DBIx::Class::Storage::DBI::Cursor>.
518
519 =back
520
521 Some real-life examples of arguments to L</connect_info> and
522 L<DBIx::Class::Schema/connect>
523
524   # Simple SQLite connection
525   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
526
527   # Connect via subref
528   ->connect_info([ sub { DBI->connect(...) } ]);
529
530   # Connect via subref in hashref
531   ->connect_info([{
532     dbh_maker => sub { DBI->connect(...) },
533     on_connect_do => 'alter session ...',
534   }]);
535
536   # A bit more complicated
537   ->connect_info(
538     [
539       'dbi:Pg:dbname=foo',
540       'postgres',
541       'my_pg_password',
542       { AutoCommit => 1 },
543       { quote_char => q{"} },
544     ]
545   );
546
547   # Equivalent to the previous example
548   ->connect_info(
549     [
550       'dbi:Pg:dbname=foo',
551       'postgres',
552       'my_pg_password',
553       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
554     ]
555   );
556
557   # Same, but with hashref as argument
558   # See parse_connect_info for explanation
559   ->connect_info(
560     [{
561       dsn         => 'dbi:Pg:dbname=foo',
562       user        => 'postgres',
563       password    => 'my_pg_password',
564       AutoCommit  => 1,
565       quote_char  => q{"},
566       name_sep    => q{.},
567     }]
568   );
569
570   # Subref + DBIx::Class-specific connection options
571   ->connect_info(
572     [
573       sub { DBI->connect(...) },
574       {
575           quote_char => q{`},
576           name_sep => q{@},
577           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
578           disable_sth_caching => 1,
579       },
580     ]
581   );
582
583
584
585 =cut
586
587 sub connect_info {
588   my ($self, $info) = @_;
589
590   return $self->_connect_info if !$info;
591
592   $self->_connect_info($info); # copy for _connect_info
593
594   $info = $self->_normalize_connect_info($info)
595     if ref $info eq 'ARRAY';
596
597   for my $storage_opt (keys %{ $info->{storage_options} }) {
598     my $value = $info->{storage_options}{$storage_opt};
599
600     $self->$storage_opt($value);
601   }
602
603   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
604   #  the new set of options
605   $self->_sql_maker(undef);
606   $self->_sql_maker_opts({});
607
608   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
609     my $value = $info->{sql_maker_options}{$sql_maker_opt};
610
611     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
612   }
613
614   my %attrs = (
615     %{ $self->_default_dbi_connect_attributes || {} },
616     %{ $info->{attributes} || {} },
617   );
618
619   my @args = @{ $info->{arguments} };
620
621   if (keys %attrs and ref $args[0] ne 'CODE') {
622     carp
623         'You provided explicit AutoCommit => 0 in your connection_info. '
624       . 'This is almost universally a bad idea (see the footnotes of '
625       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
626       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
627       . 'this warning.'
628       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
629
630     push @args, \%attrs if keys %attrs;
631   }
632   $self->_dbi_connect_info(\@args);
633
634   # FIXME - dirty:
635   # save attributes them in a separate accessor so they are always
636   # introspectable, even in case of a CODE $dbhmaker
637   $self->_dbic_connect_attributes (\%attrs);
638
639   return $self->_connect_info;
640 }
641
642 sub _normalize_connect_info {
643   my ($self, $info_arg) = @_;
644   my %info;
645
646   my @args = @$info_arg;  # take a shallow copy for further mutilation
647
648   # combine/pre-parse arguments depending on invocation style
649
650   my %attrs;
651   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
652     %attrs = %{ $args[1] || {} };
653     @args = $args[0];
654   }
655   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
656     %attrs = %{$args[0]};
657     @args = ();
658     if (my $code = delete $attrs{dbh_maker}) {
659       @args = $code;
660
661       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
662       if (@ignored) {
663         carp sprintf (
664             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
665           . "to the result of 'dbh_maker'",
666
667           join (', ', map { "'$_'" } (@ignored) ),
668         );
669       }
670     }
671     else {
672       @args = delete @attrs{qw/dsn user password/};
673     }
674   }
675   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
676     %attrs = (
677       % { $args[3] || {} },
678       % { $args[4] || {} },
679     );
680     @args = @args[0,1,2];
681   }
682
683   $info{arguments} = \@args;
684
685   my @storage_opts = grep exists $attrs{$_},
686     @storage_options, 'cursor_class';
687
688   @{ $info{storage_options} }{@storage_opts} =
689     delete @attrs{@storage_opts} if @storage_opts;
690
691   my @sql_maker_opts = grep exists $attrs{$_},
692     qw/limit_dialect quote_char name_sep quote_names/;
693
694   @{ $info{sql_maker_options} }{@sql_maker_opts} =
695     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
696
697   $info{attributes} = \%attrs if %attrs;
698
699   return \%info;
700 }
701
702 sub _default_dbi_connect_attributes () {
703   +{
704     AutoCommit => 1,
705     PrintError => 0,
706     RaiseError => 1,
707     ShowErrorStatement => 1,
708   };
709 }
710
711 =head2 on_connect_do
712
713 This method is deprecated in favour of setting via L</connect_info>.
714
715 =cut
716
717 =head2 on_disconnect_do
718
719 This method is deprecated in favour of setting via L</connect_info>.
720
721 =cut
722
723 sub _parse_connect_do {
724   my ($self, $type) = @_;
725
726   my $val = $self->$type;
727   return () if not defined $val;
728
729   my @res;
730
731   if (not ref($val)) {
732     push @res, [ 'do_sql', $val ];
733   } elsif (ref($val) eq 'CODE') {
734     push @res, $val;
735   } elsif (ref($val) eq 'ARRAY') {
736     push @res, map { [ 'do_sql', $_ ] } @$val;
737   } else {
738     $self->throw_exception("Invalid type for $type: ".ref($val));
739   }
740
741   return \@res;
742 }
743
744 =head2 dbh_do
745
746 Arguments: ($subref | $method_name), @extra_coderef_args?
747
748 Execute the given $subref or $method_name using the new exception-based
749 connection management.
750
751 The first two arguments will be the storage object that C<dbh_do> was called
752 on and a database handle to use.  Any additional arguments will be passed
753 verbatim to the called subref as arguments 2 and onwards.
754
755 Using this (instead of $self->_dbh or $self->dbh) ensures correct
756 exception handling and reconnection (or failover in future subclasses).
757
758 Your subref should have no side-effects outside of the database, as
759 there is the potential for your subref to be partially double-executed
760 if the database connection was stale/dysfunctional.
761
762 Example:
763
764   my @stuff = $schema->storage->dbh_do(
765     sub {
766       my ($storage, $dbh, @cols) = @_;
767       my $cols = join(q{, }, @cols);
768       $dbh->selectrow_array("SELECT $cols FROM foo");
769     },
770     @column_list
771   );
772
773 =cut
774
775 sub dbh_do {
776   my $self = shift;
777   my $code = shift;
778
779   my $dbh = $self->_get_dbh;
780
781   return $self->$code($dbh, @_)
782     if ( $self->{_in_do_block} || $self->{transaction_depth} );
783
784   local $self->{_in_do_block} = 1;
785
786   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
787   my $args = \@_;
788
789   try {
790     $self->$code ($dbh, @$args);
791   } catch {
792     $self->throw_exception($_) if $self->connected;
793
794     # We were not connected - reconnect and retry, but let any
795     #  exception fall right through this time
796     carp "Retrying dbh_do($code) after catching disconnected exception: $_"
797       if $ENV{DBIC_STORAGE_RETRY_DEBUG};
798
799     $self->_populate_dbh;
800     $self->$code($self->_dbh, @$args);
801   };
802 }
803
804 sub txn_do {
805   # connects or reconnects on pid change, necessary to grab correct txn_depth
806   $_[0]->_get_dbh;
807   local $_[0]->{_in_do_block} = 1;
808   shift->next::method(@_);
809 }
810
811 =head2 disconnect
812
813 Our C<disconnect> method also performs a rollback first if the
814 database is not in C<AutoCommit> mode.
815
816 =cut
817
818 sub disconnect {
819   my ($self) = @_;
820
821   if( $self->_dbh ) {
822     my @actions;
823
824     push @actions, ( $self->on_disconnect_call || () );
825     push @actions, $self->_parse_connect_do ('on_disconnect_do');
826
827     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
828
829     # stops the "implicit rollback on disconnect" warning
830     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
831
832     %{ $self->_dbh->{CachedKids} } = ();
833     $self->_dbh->disconnect;
834     $self->_dbh(undef);
835     $self->{_dbh_gen}++;
836   }
837 }
838
839 =head2 with_deferred_fk_checks
840
841 =over 4
842
843 =item Arguments: C<$coderef>
844
845 =item Return Value: The return value of $coderef
846
847 =back
848
849 Storage specific method to run the code ref with FK checks deferred or
850 in MySQL's case disabled entirely.
851
852 =cut
853
854 # Storage subclasses should override this
855 sub with_deferred_fk_checks {
856   my ($self, $sub) = @_;
857   $sub->();
858 }
859
860 =head2 connected
861
862 =over
863
864 =item Arguments: none
865
866 =item Return Value: 1|0
867
868 =back
869
870 Verifies that the current database handle is active and ready to execute
871 an SQL statement (e.g. the connection did not get stale, server is still
872 answering, etc.) This method is used internally by L</dbh>.
873
874 =cut
875
876 sub connected {
877   my $self = shift;
878   return 0 unless $self->_seems_connected;
879
880   #be on the safe side
881   local $self->_dbh->{RaiseError} = 1;
882
883   return $self->_ping;
884 }
885
886 sub _seems_connected {
887   my $self = shift;
888
889   $self->_verify_pid;
890
891   my $dbh = $self->_dbh
892     or return 0;
893
894   return $dbh->FETCH('Active');
895 }
896
897 sub _ping {
898   my $self = shift;
899
900   my $dbh = $self->_dbh or return 0;
901
902   return $dbh->ping;
903 }
904
905 sub ensure_connected {
906   my ($self) = @_;
907
908   unless ($self->connected) {
909     $self->_populate_dbh;
910   }
911 }
912
913 =head2 dbh
914
915 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
916 is guaranteed to be healthy by implicitly calling L</connected>, and if
917 necessary performing a reconnection before returning. Keep in mind that this
918 is very B<expensive> on some database engines. Consider using L</dbh_do>
919 instead.
920
921 =cut
922
923 sub dbh {
924   my ($self) = @_;
925
926   if (not $self->_dbh) {
927     $self->_populate_dbh;
928   } else {
929     $self->ensure_connected;
930   }
931   return $self->_dbh;
932 }
933
934 # this is the internal "get dbh or connect (don't check)" method
935 sub _get_dbh {
936   my $self = shift;
937   $self->_verify_pid;
938   $self->_populate_dbh unless $self->_dbh;
939   return $self->_dbh;
940 }
941
942 sub sql_maker {
943   my ($self) = @_;
944   unless ($self->_sql_maker) {
945     my $sql_maker_class = $self->sql_maker_class;
946
947     my %opts = %{$self->_sql_maker_opts||{}};
948     my $dialect =
949       $opts{limit_dialect}
950         ||
951       $self->sql_limit_dialect
952         ||
953       do {
954         my $s_class = (ref $self) || $self;
955         carp (
956           "Your storage class ($s_class) does not set sql_limit_dialect and you "
957         . 'have not supplied an explicit limit_dialect in your connection_info. '
958         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
959         . 'databases but can be (and often is) painfully slow. '
960         . "Please file an RT ticket against '$s_class' ."
961         );
962
963         'GenericSubQ';
964       }
965     ;
966
967     my ($quote_char, $name_sep);
968
969     if ($opts{quote_names}) {
970       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
971         my $s_class = (ref $self) || $self;
972         carp (
973           "You requested 'quote_names' but your storage class ($s_class) does "
974         . 'not explicitly define a default sql_quote_char and you have not '
975         . 'supplied a quote_char as part of your connection_info. DBIC will '
976         .q{default to the ANSI SQL standard quote '"', which works most of }
977         . "the time. Please file an RT ticket against '$s_class'."
978         );
979
980         '"'; # RV
981       };
982
983       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
984     }
985
986     $self->_sql_maker($sql_maker_class->new(
987       bindtype=>'columns',
988       array_datatypes => 1,
989       limit_dialect => $dialect,
990       ($quote_char ? (quote_char => $quote_char) : ()),
991       name_sep => ($name_sep || '.'),
992       %opts,
993     ));
994   }
995   return $self->_sql_maker;
996 }
997
998 # nothing to do by default
999 sub _rebless {}
1000 sub _init {}
1001
1002 sub _populate_dbh {
1003   my ($self) = @_;
1004
1005   my @info = @{$self->_dbi_connect_info || []};
1006   $self->_dbh(undef); # in case ->connected failed we might get sent here
1007   $self->_dbh_details({}); # reset everything we know
1008
1009   $self->_dbh($self->_connect(@info));
1010
1011   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1012
1013   $self->_determine_driver;
1014
1015   # Always set the transaction depth on connect, since
1016   #  there is no transaction in progress by definition
1017   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1018
1019   $self->_run_connection_actions unless $self->{_in_determine_driver};
1020 }
1021
1022 sub _run_connection_actions {
1023   my $self = shift;
1024   my @actions;
1025
1026   push @actions, ( $self->on_connect_call || () );
1027   push @actions, $self->_parse_connect_do ('on_connect_do');
1028
1029   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1030 }
1031
1032
1033
1034 sub set_use_dbms_capability {
1035   $_[0]->set_inherited ($_[1], $_[2]);
1036 }
1037
1038 sub get_use_dbms_capability {
1039   my ($self, $capname) = @_;
1040
1041   my $use = $self->get_inherited ($capname);
1042   return defined $use
1043     ? $use
1044     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1045   ;
1046 }
1047
1048 sub set_dbms_capability {
1049   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1050 }
1051
1052 sub get_dbms_capability {
1053   my ($self, $capname) = @_;
1054
1055   my $cap = $self->_dbh_details->{capability}{$capname};
1056
1057   unless (defined $cap) {
1058     if (my $meth = $self->can ("_determine$capname")) {
1059       $cap = $self->$meth ? 1 : 0;
1060     }
1061     else {
1062       $cap = 0;
1063     }
1064
1065     $self->set_dbms_capability ($capname, $cap);
1066   }
1067
1068   return $cap;
1069 }
1070
1071 sub _server_info {
1072   my $self = shift;
1073
1074   my $info;
1075   unless ($info = $self->_dbh_details->{info}) {
1076
1077     $info = {};
1078
1079     my $server_version = try { $self->_get_server_version };
1080
1081     if (defined $server_version) {
1082       $info->{dbms_version} = $server_version;
1083
1084       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1085       my @verparts = split (/\./, $numeric_version);
1086       if (
1087         @verparts
1088           &&
1089         $verparts[0] <= 999
1090       ) {
1091         # consider only up to 3 version parts, iff not more than 3 digits
1092         my @use_parts;
1093         while (@verparts && @use_parts < 3) {
1094           my $p = shift @verparts;
1095           last if $p > 999;
1096           push @use_parts, $p;
1097         }
1098         push @use_parts, 0 while @use_parts < 3;
1099
1100         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1101       }
1102     }
1103
1104     $self->_dbh_details->{info} = $info;
1105   }
1106
1107   return $info;
1108 }
1109
1110 sub _get_server_version {
1111   shift->_dbh_get_info(18);
1112 }
1113
1114 sub _dbh_get_info {
1115   my ($self, $info) = @_;
1116
1117   return try { $self->_get_dbh->get_info($info) } || undef;
1118 }
1119
1120 sub _determine_driver {
1121   my ($self) = @_;
1122
1123   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1124     my $started_connected = 0;
1125     local $self->{_in_determine_driver} = 1;
1126
1127     if (ref($self) eq __PACKAGE__) {
1128       my $driver;
1129       if ($self->_dbh) { # we are connected
1130         $driver = $self->_dbh->{Driver}{Name};
1131         $started_connected = 1;
1132       } else {
1133         # if connect_info is a CODEREF, we have no choice but to connect
1134         if (ref $self->_dbi_connect_info->[0] &&
1135             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1136           $self->_populate_dbh;
1137           $driver = $self->_dbh->{Driver}{Name};
1138         }
1139         else {
1140           # try to use dsn to not require being connected, the driver may still
1141           # force a connection in _rebless to determine version
1142           # (dsn may not be supplied at all if all we do is make a mock-schema)
1143           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1144           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1145           $driver ||= $ENV{DBI_DRIVER};
1146         }
1147       }
1148
1149       if ($driver) {
1150         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1151         if ($self->load_optional_class($storage_class)) {
1152           mro::set_mro($storage_class, 'c3');
1153           bless $self, $storage_class;
1154           $self->_rebless();
1155         }
1156       }
1157     }
1158
1159     $self->_driver_determined(1);
1160
1161     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1162
1163     $self->_init; # run driver-specific initializations
1164
1165     $self->_run_connection_actions
1166         if !$started_connected && defined $self->_dbh;
1167   }
1168 }
1169
1170 sub _do_connection_actions {
1171   my $self          = shift;
1172   my $method_prefix = shift;
1173   my $call          = shift;
1174
1175   if (not ref($call)) {
1176     my $method = $method_prefix . $call;
1177     $self->$method(@_);
1178   } elsif (ref($call) eq 'CODE') {
1179     $self->$call(@_);
1180   } elsif (ref($call) eq 'ARRAY') {
1181     if (ref($call->[0]) ne 'ARRAY') {
1182       $self->_do_connection_actions($method_prefix, $_) for @$call;
1183     } else {
1184       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1185     }
1186   } else {
1187     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1188   }
1189
1190   return $self;
1191 }
1192
1193 sub connect_call_do_sql {
1194   my $self = shift;
1195   $self->_do_query(@_);
1196 }
1197
1198 sub disconnect_call_do_sql {
1199   my $self = shift;
1200   $self->_do_query(@_);
1201 }
1202
1203 # override in db-specific backend when necessary
1204 sub connect_call_datetime_setup { 1 }
1205
1206 sub _do_query {
1207   my ($self, $action) = @_;
1208
1209   if (ref $action eq 'CODE') {
1210     $action = $action->($self);
1211     $self->_do_query($_) foreach @$action;
1212   }
1213   else {
1214     # Most debuggers expect ($sql, @bind), so we need to exclude
1215     # the attribute hash which is the second argument to $dbh->do
1216     # furthermore the bind values are usually to be presented
1217     # as named arrayref pairs, so wrap those here too
1218     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1219     my $sql = shift @do_args;
1220     my $attrs = shift @do_args;
1221     my @bind = map { [ undef, $_ ] } @do_args;
1222
1223     $self->_query_start($sql, \@bind);
1224     $self->_get_dbh->do($sql, $attrs, @do_args);
1225     $self->_query_end($sql, \@bind);
1226   }
1227
1228   return $self;
1229 }
1230
1231 sub _connect {
1232   my ($self, @info) = @_;
1233
1234   $self->throw_exception("You failed to provide any connection info")
1235     if !@info;
1236
1237   my ($old_connect_via, $dbh);
1238
1239   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1240     $old_connect_via = $DBI::connect_via;
1241     $DBI::connect_via = 'connect';
1242   }
1243
1244   try {
1245     if(ref $info[0] eq 'CODE') {
1246       $dbh = $info[0]->();
1247     }
1248     else {
1249       require DBI;
1250       $dbh = DBI->connect(@info);
1251     }
1252
1253     if (!$dbh) {
1254       die $DBI::errstr;
1255     }
1256
1257     unless ($self->unsafe) {
1258
1259       $self->throw_exception(
1260         'Refusing clobbering of {HandleError} installed on externally supplied '
1261        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1262       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1263
1264       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1265       # request, or an external handle. Complain and set anyway
1266       unless ($dbh->{RaiseError}) {
1267         carp( ref $info[0] eq 'CODE'
1268
1269           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1270            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1271            .'attribute has been supplied'
1272
1273           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1274            .'unsafe => 1. Toggling RaiseError back to true'
1275         );
1276
1277         $dbh->{RaiseError} = 1;
1278       }
1279
1280       # this odd anonymous coderef dereference is in fact really
1281       # necessary to avoid the unwanted effect described in perl5
1282       # RT#75792
1283       sub {
1284         my $weak_self = $_[0];
1285         weaken $weak_self;
1286
1287         # the coderef is blessed so we can distinguish it from externally
1288         # supplied handles (which must be preserved)
1289         $_[1]->{HandleError} = bless sub {
1290           if ($weak_self) {
1291             $weak_self->throw_exception("DBI Exception: $_[0]");
1292           }
1293           else {
1294             # the handler may be invoked by something totally out of
1295             # the scope of DBIC
1296             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1297           }
1298         }, '__DBIC__DBH__ERROR__HANDLER__';
1299       }->($self, $dbh);
1300     }
1301   }
1302   catch {
1303     $self->throw_exception("DBI Connection failed: $_")
1304   }
1305   finally {
1306     $DBI::connect_via = $old_connect_via if $old_connect_via;
1307   };
1308
1309   $self->_dbh_autocommit($dbh->{AutoCommit});
1310   $dbh;
1311 }
1312
1313 sub txn_begin {
1314   my $self = shift;
1315
1316   # this means we have not yet connected and do not know the AC status
1317   # (e.g. coderef $dbh), need a full-fledged connection check
1318   if (! defined $self->_dbh_autocommit) {
1319     $self->ensure_connected;
1320   }
1321   # Otherwise simply connect or re-connect on pid changes
1322   else {
1323     $self->_get_dbh;
1324   }
1325
1326   $self->next::method(@_);
1327 }
1328
1329 sub _exec_txn_begin {
1330   my $self = shift;
1331
1332   # if the user is utilizing txn_do - good for him, otherwise we need to
1333   # ensure that the $dbh is healthy on BEGIN.
1334   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1335   # will be replaced by a failure of begin_work itself (which will be
1336   # then retried on reconnect)
1337   if ($self->{_in_do_block}) {
1338     $self->_dbh->begin_work;
1339   } else {
1340     $self->dbh_do(sub { $_[1]->begin_work });
1341   }
1342 }
1343
1344 sub txn_commit {
1345   my $self = shift;
1346
1347   $self->_verify_pid if $self->_dbh;
1348   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1349     unless $self->_dbh;
1350
1351   # esoteric case for folks using external $dbh handles
1352   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1353     carp "Storage transaction_depth 0 does not match "
1354         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1355     $self->transaction_depth(1);
1356   }
1357
1358   $self->next::method(@_);
1359
1360   # if AutoCommit is disabled txn_depth never goes to 0
1361   # as a new txn is started immediately on commit
1362   $self->transaction_depth(1) if (
1363     !$self->transaction_depth
1364       and
1365     defined $self->_dbh_autocommit
1366       and
1367     ! $self->_dbh_autocommit
1368   );
1369 }
1370
1371 sub _exec_txn_commit {
1372   shift->_dbh->commit;
1373 }
1374
1375 sub txn_rollback {
1376   my $self = shift;
1377
1378   $self->_verify_pid if $self->_dbh;
1379   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1380     unless $self->_dbh;
1381
1382   # esoteric case for folks using external $dbh handles
1383   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1384     carp "Storage transaction_depth 0 does not match "
1385         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1386     $self->transaction_depth(1);
1387   }
1388
1389   $self->next::method(@_);
1390
1391   # if AutoCommit is disabled txn_depth never goes to 0
1392   # as a new txn is started immediately on commit
1393   $self->transaction_depth(1) if (
1394     !$self->transaction_depth
1395       and
1396     defined $self->_dbh_autocommit
1397       and
1398     ! $self->_dbh_autocommit
1399   );
1400 }
1401
1402 sub _exec_txn_rollback {
1403   shift->_dbh->rollback;
1404 }
1405
1406 # generate some identical methods
1407 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1408   no strict qw/refs/;
1409   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1410     my $self = shift;
1411     $self->_verify_pid if $self->_dbh;
1412     $self->throw_exception("Unable to $meth() on a disconnected storage")
1413       unless $self->_dbh;
1414     $self->next::method(@_);
1415   };
1416 }
1417
1418 # This used to be the top-half of _execute.  It was split out to make it
1419 #  easier to override in NoBindVars without duping the rest.  It takes up
1420 #  all of _execute's args, and emits $sql, @bind.
1421 sub _prep_for_execute {
1422   #my ($self, $op, $ident, $args) = @_;
1423   return shift->_gen_sql_bind(@_)
1424 }
1425
1426 sub _gen_sql_bind {
1427   my ($self, $op, $ident, $args) = @_;
1428
1429   my ($sql, @bind) = $self->sql_maker->$op(
1430     blessed($ident) ? $ident->from : $ident,
1431     @$args,
1432   );
1433
1434   if (
1435     ! $ENV{DBIC_DT_SEARCH_OK}
1436       and
1437     $op eq 'select'
1438       and
1439     first { blessed($_->[1]) && $_->[1]->isa('DateTime') } @bind
1440   ) {
1441     carp_unique 'DateTime objects passed to search() are not supported '
1442       . 'properly (InflateColumn::DateTime formats and settings are not '
1443       . 'respected.) See "Formatting DateTime objects in queries" in '
1444       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1445       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1446   }
1447
1448   return( $sql, $self->_resolve_bindattrs(
1449     $ident, [ @{$args->[2]{bind}||[]}, @bind ]
1450   ));
1451 }
1452
1453 sub _resolve_bindattrs {
1454   my ($self, $ident, $bind, $colinfos) = @_;
1455
1456   $colinfos ||= {};
1457
1458   my $resolve_bindinfo = sub {
1459     #my $infohash = shift;
1460
1461     %$colinfos = %{ $self->_resolve_column_info($ident) }
1462       unless keys %$colinfos;
1463
1464     my $ret;
1465     if (my $col = $_[0]->{dbic_colname}) {
1466       $ret = { %{$_[0]} };
1467
1468       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1469         if $colinfos->{$col}{data_type};
1470
1471       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1472         if $colinfos->{$col}{size};
1473     }
1474
1475     $ret || $_[0];
1476   };
1477
1478   return [ map {
1479     if (ref $_ ne 'ARRAY') {
1480       [{}, $_]
1481     }
1482     elsif (! defined $_->[0]) {
1483       [{}, $_->[1]]
1484     }
1485     elsif (ref $_->[0] eq 'HASH') {
1486       [
1487         ($_->[0]{dbd_attrs} or $_->[0]{sqlt_datatype}) ? $_->[0] : $resolve_bindinfo->($_->[0]),
1488         $_->[1]
1489       ]
1490     }
1491     elsif (ref $_->[0] eq 'SCALAR') {
1492       [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1493     }
1494     else {
1495       [ $resolve_bindinfo->({ dbic_colname => $_->[0] }), $_->[1] ]
1496     }
1497   } @$bind ];
1498 }
1499
1500 sub _format_for_trace {
1501   #my ($self, $bind) = @_;
1502
1503   ### Turn @bind from something like this:
1504   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1505   ### to this:
1506   ###   ( "'1'", "'3'" )
1507
1508   map {
1509     defined( $_ && $_->[1] )
1510       ? qq{'$_->[1]'}
1511       : q{NULL}
1512   } @{$_[1] || []};
1513 }
1514
1515 sub _query_start {
1516   my ( $self, $sql, $bind ) = @_;
1517
1518   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1519     if $self->debug;
1520 }
1521
1522 sub _query_end {
1523   my ( $self, $sql, $bind ) = @_;
1524
1525   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1526     if $self->debug;
1527 }
1528
1529 my $sba_compat;
1530 sub _dbi_attrs_for_bind {
1531   my ($self, $ident, $bind) = @_;
1532
1533   if (! defined $sba_compat) {
1534     $self->_determine_driver;
1535     $sba_compat = $self->can('source_bind_attributes') == \&source_bind_attributes
1536       ? 0
1537       : 1
1538     ;
1539   }
1540
1541   my $sba_attrs;
1542   if ($sba_compat) {
1543     my $class = ref $self;
1544     carp_unique (
1545       "The source_bind_attributes() override in $class relies on a deprecated codepath. "
1546      .'You are strongly advised to switch your code to override bind_attribute_by_datatype() '
1547      .'instead. This legacy compat shim will also disappear some time before DBIC 0.09'
1548     );
1549
1550     my $sba_attrs = $self->source_bind_attributes
1551   }
1552
1553   my @attrs;
1554
1555   for (map { $_->[0] } @$bind) {
1556     push @attrs, do {
1557       if (exists $_->{dbd_attrs}) {
1558         $_->{dbd_attrs}
1559       }
1560       elsif($_->{sqlt_datatype}) {
1561         # cache the result in the dbh_details hash, as it can not change unless
1562         # we connect to something else
1563         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1564         if (not exists $cache->{$_->{sqlt_datatype}}) {
1565           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1566         }
1567         $cache->{$_->{sqlt_datatype}};
1568       }
1569       elsif ($sba_attrs and $_->{dbic_colname}) {
1570         $sba_attrs->{$_->{dbic_colname}} || undef;
1571       }
1572       else {
1573         undef;  # always push something at this position
1574       }
1575     }
1576   }
1577
1578   return \@attrs;
1579 }
1580
1581 sub _execute {
1582   my ($self, $op, $ident, @args) = @_;
1583
1584   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1585
1586   shift->dbh_do(    # retry over disconnects
1587     '_dbh_execute',
1588     $sql,
1589     $bind,
1590     $self->_dbi_attrs_for_bind($ident, $bind)
1591   );
1592 }
1593
1594 sub _dbh_execute {
1595   my ($self, undef, $sql, $bind, $bind_attrs) = @_;
1596
1597   $self->_query_start( $sql, $bind );
1598   my $sth = $self->_sth($sql);
1599
1600   for my $i (0 .. $#$bind) {
1601     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1602       $sth->bind_param_inout(
1603         $i + 1, # bind params counts are 1-based
1604         $bind->[$i][1],
1605         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1606         $bind_attrs->[$i],
1607       );
1608     }
1609     else {
1610       $sth->bind_param(
1611         $i + 1,
1612         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1613           ? "$bind->[$i][1]"
1614           : $bind->[$i][1]
1615         ,
1616         $bind_attrs->[$i],
1617       );
1618     }
1619   }
1620
1621   # Can this fail without throwing an exception anyways???
1622   my $rv = $sth->execute();
1623   $self->throw_exception(
1624     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1625   ) if !$rv;
1626
1627   $self->_query_end( $sql, $bind );
1628
1629   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1630 }
1631
1632 sub _prefetch_autovalues {
1633   my ($self, $source, $to_insert) = @_;
1634
1635   my $colinfo = $source->columns_info;
1636
1637   my %values;
1638   for my $col (keys %$colinfo) {
1639     if (
1640       $colinfo->{$col}{auto_nextval}
1641         and
1642       (
1643         ! exists $to_insert->{$col}
1644           or
1645         ref $to_insert->{$col} eq 'SCALAR'
1646           or
1647         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1648       )
1649     ) {
1650       $values{$col} = $self->_sequence_fetch(
1651         'NEXTVAL',
1652         ( $colinfo->{$col}{sequence} ||=
1653             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1654         ),
1655       );
1656     }
1657   }
1658
1659   \%values;
1660 }
1661
1662 sub insert {
1663   my ($self, $source, $to_insert) = @_;
1664
1665   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1666
1667   # fuse the values, but keep a separate list of prefetched_values so that
1668   # they can be fused once again with the final return
1669   $to_insert = { %$to_insert, %$prefetched_values };
1670
1671   my $col_infos = $source->columns_info;
1672   my %pcols = map { $_ => 1 } $source->primary_columns;
1673   my %retrieve_cols;
1674   for my $col ($source->columns) {
1675     # nothing to retrieve when explicit values are supplied
1676     next if (defined $to_insert->{$col} and ! (
1677       ref $to_insert->{$col} eq 'SCALAR'
1678         or
1679       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1680     ));
1681
1682     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1683     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1684       $pcols{$col}
1685         or
1686       $col_infos->{$col}{retrieve_on_insert}
1687     );
1688   };
1689
1690   my ($sqla_opts, @ir_container);
1691   if (%retrieve_cols and $self->_use_insert_returning) {
1692     $sqla_opts->{returning_container} = \@ir_container
1693       if $self->_use_insert_returning_bound;
1694
1695     $sqla_opts->{returning} = [
1696       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1697     ];
1698   }
1699
1700   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1701
1702   my %returned_cols = %$to_insert;
1703   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1704     @ir_container = try {
1705       local $SIG{__WARN__} = sub {};
1706       my @r = $sth->fetchrow_array;
1707       $sth->finish;
1708       @r;
1709     } unless @ir_container;
1710
1711     @returned_cols{@$retlist} = @ir_container if @ir_container;
1712   }
1713   else {
1714     # pull in PK if needed and then everything else
1715     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1716
1717       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1718         unless $self->can('last_insert_id');
1719
1720       my @pri_values = $self->last_insert_id($source, @missing_pri);
1721
1722       $self->throw_exception( "Can't get last insert id" )
1723         unless (@pri_values == @missing_pri);
1724
1725       @returned_cols{@missing_pri} = @pri_values;
1726       delete $retrieve_cols{$_} for @missing_pri;
1727     }
1728
1729     # if there is more left to pull
1730     if (%retrieve_cols) {
1731       $self->throw_exception(
1732         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1733       ) unless %pcols;
1734
1735       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1736
1737       my $cur = DBIx::Class::ResultSet->new($source, {
1738         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1739         select => \@left_to_fetch,
1740       })->cursor;
1741
1742       @returned_cols{@left_to_fetch} = $cur->next;
1743
1744       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1745         if scalar $cur->next;
1746     }
1747   }
1748
1749   return { %$prefetched_values, %returned_cols };
1750 }
1751
1752 sub insert_bulk {
1753   my ($self, $source, $cols, $data) = @_;
1754
1755   my @col_range = (0..$#$cols);
1756
1757   # FIXME - perhaps this is not even needed? does DBI stringify?
1758   #
1759   # forcibly stringify whatever is stringifiable
1760   # ResultSet::populate() hands us a copy - safe to mangle
1761   for my $r (0 .. $#$data) {
1762     for my $c (0 .. $#{$data->[$r]}) {
1763       $data->[$r][$c] = "$data->[$r][$c]"
1764         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1765     }
1766   }
1767
1768   my $colinfo_cache = {}; # since we will run _resolve_bindattrs on the same $source a lot
1769
1770   # get a slice type index based on first row of data
1771   # a "column" in this context may refer to more than one bind value
1772   # e.g. \[ '?, ?', [...], [...] ]
1773   #
1774   # construct the value type index - a description of values types for every
1775   # per-column slice of $data:
1776   #
1777   # nonexistent - nonbind literal
1778   # 0 - regular value
1779   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
1780   #
1781   # also construct the column hash to pass to the SQL generator. For plain
1782   # (non literal) values - convert the members of the first row into a
1783   # literal+bind combo, with extra positional info in the bind attr hashref.
1784   # This will allow us to match the order properly, and is so contrived
1785   # because a user-supplied literal/bind (or something else specific to a
1786   # resultsource and/or storage driver) can inject extra binds along the
1787   # way, so one can't rely on "shift positions" ordering at all. Also we
1788   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
1789   # can be later matched up by address), because we want to supply a real
1790   # value on which perhaps e.g. datatype checks will be performed
1791   my ($proto_data, $value_type_idx);
1792   for my $i (@col_range) {
1793     my $colname = $cols->[$i];
1794     if (ref $data->[0][$i] eq 'SCALAR') {
1795       # no bind value at all - no type
1796
1797       $proto_data->{$colname} = $data->[0][$i];
1798     }
1799     elsif (ref $data->[0][$i] eq 'REF' and ref ${$data->[0][$i]} eq 'ARRAY' ) {
1800       # repack, so we don't end up mangling the original \[]
1801       my ($sql, @bind) = @${$data->[0][$i]};
1802
1803       # normalization of user supplied stuff
1804       my $resolved_bind = $self->_resolve_bindattrs(
1805         $source, \@bind, $colinfo_cache,
1806       );
1807
1808       # store value-less (attrs only) bind info - we will be comparing all
1809       # supplied binds against this for sanity
1810       $value_type_idx->{$i} = [ map { $_->[0] } @$resolved_bind ];
1811
1812       $proto_data->{$colname} = \[ $sql, map { [
1813         # inject slice order to use for $proto_bind construction
1814           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $i }
1815             =>
1816           $resolved_bind->[$_][1]
1817         ] } (0 .. $#bind)
1818       ];
1819     }
1820     else {
1821       $value_type_idx->{$i} = 0;
1822
1823       $proto_data->{$colname} = \[ '?', [
1824         { dbic_colname => $colname, _bind_data_slice_idx => $i }
1825           =>
1826         $data->[0][$i]
1827       ] ];
1828     }
1829   }
1830
1831   my ($sql, $proto_bind) = $self->_prep_for_execute (
1832     'insert',
1833     $source,
1834     [ $proto_data ],
1835   );
1836
1837   if (! @$proto_bind and keys %$value_type_idx) {
1838     # if the bindlist is empty and we had some dynamic binds, this means the
1839     # storage ate them away (e.g. the NoBindVars component) and interpolated
1840     # them directly into the SQL. This obviously can't be good for multi-inserts
1841     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1842   }
1843
1844   # sanity checks
1845   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
1846   #
1847   # use an error reporting closure for convenience (less to pass)
1848   my $bad_slice_report_cref = sub {
1849     my ($msg, $r_idx, $c_idx) = @_;
1850     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1851       $msg,
1852       $cols->[$c_idx],
1853       do {
1854         require Data::Dumper::Concise;
1855         local $Data::Dumper::Maxdepth = 5;
1856         Data::Dumper::Concise::Dumper ({
1857           map { $cols->[$_] =>
1858             $data->[$r_idx][$_]
1859           } @col_range
1860         }),
1861       }
1862     );
1863   };
1864
1865   for my $col_idx (@col_range) {
1866     my $reference_val = $data->[0][$col_idx];
1867
1868     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
1869       my $val = $data->[$row_idx][$col_idx];
1870
1871       if (! exists $value_type_idx->{$col_idx}) { # literal no binds
1872         if (ref $val ne 'SCALAR') {
1873           $bad_slice_report_cref->(
1874             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
1875             $row_idx,
1876             $col_idx,
1877           );
1878         }
1879         elsif ($$val ne $$reference_val) {
1880           $bad_slice_report_cref->(
1881             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
1882             $row_idx,
1883             $col_idx,
1884           );
1885         }
1886       }
1887       elsif (! $value_type_idx->{$col_idx} ) {  # regular non-literal value
1888         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
1889           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
1890         }
1891       }
1892       else {  # binds from a \[], compare type and attrs
1893         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
1894           $bad_slice_report_cref->(
1895             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
1896             $row_idx,
1897             $col_idx,
1898           );
1899         }
1900         # start drilling down and bail out early on identical refs
1901         elsif (
1902           $reference_val != $val
1903             or
1904           $$reference_val != $$val
1905         ) {
1906           if (${$val}->[0] ne ${$reference_val}->[0]) {
1907             $bad_slice_report_cref->(
1908               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
1909               $row_idx,
1910               $col_idx,
1911             );
1912           }
1913           # need to check the bind attrs - a bind will happen only once for
1914           # the entire dataset, so any changes further down will be ignored.
1915           elsif (! Data::Compare::Compare(
1916             $value_type_idx->{$col_idx},
1917             [
1918               map
1919               { $_->[0] }
1920               @{$self->_resolve_bindattrs(
1921                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfo_cache,
1922               )}
1923             ],
1924           )) {
1925             $bad_slice_report_cref->(
1926               'Differing bind attributes on literal/bind values not supported',
1927               $row_idx,
1928               $col_idx,
1929             );
1930           }
1931         }
1932       }
1933     }
1934   }
1935
1936   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
1937   # are atomic (even if execute_for_fetch is a single call). Thus a safety
1938   # scope guard
1939   my $guard = $self->txn_scope_guard;
1940
1941   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
1942   my $sth = $self->_sth($sql);
1943   my $rv = do {
1944     if (@$proto_bind) {
1945       # proto bind contains the information on which pieces of $data to pull
1946       # $cols is passed in only for prettier error-reporting
1947       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
1948     }
1949     else {
1950       # bind_param_array doesn't work if there are no binds
1951       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1952     }
1953   };
1954
1955   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
1956
1957   $guard->commit;
1958
1959   return (wantarray ? ($rv, $sth, @$proto_bind) : $rv);
1960 }
1961
1962 # execute_for_fetch is capable of returning data just fine (it means it
1963 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
1964 # is the void-populate fast-path we will just ignore this altogether
1965 # for the time being.
1966 sub _dbh_execute_for_fetch {
1967   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
1968
1969   my @idx_range = ( 0 .. $#$proto_bind );
1970
1971   # If we have any bind attributes to take care of, we will bind the
1972   # proto-bind data (which will never be used by execute_for_fetch)
1973   # However since column bindtypes are "sticky", this is sufficient
1974   # to get the DBD to apply the bindtype to all values later on
1975
1976   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
1977
1978   for my $i (@idx_range) {
1979     $sth->bind_param (
1980       $i+1, # DBI bind indexes are 1-based
1981       $proto_bind->[$i][1],
1982       $bind_attrs->[$i],
1983     ) if defined $bind_attrs->[$i];
1984   }
1985
1986   # At this point $data slots named in the _bind_data_slice_idx of
1987   # each piece of $proto_bind are either \[]s or plain values to be
1988   # passed in. Construct the dispensing coderef. *NOTE* the order
1989   # of $data will differ from this of the ?s in the SQL (due to
1990   # alphabetical ordering by colname). We actually do want to
1991   # preserve this behavior so that prepare_cached has a better
1992   # chance of matching on unrelated calls
1993   my %data_reorder = map { $proto_bind->[$_][0]{_bind_data_slice_idx} => $_ } @idx_range;
1994
1995   my $fetch_row_idx = -1; # saner loop this way
1996   my $fetch_tuple = sub {
1997     return undef if ++$fetch_row_idx > $#$data;
1998
1999     return [ map
2000       { (ref $_ eq 'REF' and ref $$_ eq 'ARRAY')
2001         ? map { $_->[-1] } @{$$_}[1 .. $#$$_]
2002         : $_
2003       }
2004       map
2005         { $data->[$fetch_row_idx][$_]}
2006         sort
2007           { $data_reorder{$a} <=> $data_reorder{$b} }
2008           keys %data_reorder
2009     ];
2010   };
2011
2012   my $tuple_status = [];
2013   my ($rv, $err);
2014   try {
2015     $rv = $sth->execute_for_fetch(
2016       $fetch_tuple,
2017       $tuple_status,
2018     );
2019   }
2020   catch {
2021     $err = shift;
2022   };
2023
2024   # Not all DBDs are create equal. Some throw on error, some return
2025   # an undef $rv, and some set $sth->err - try whatever we can
2026   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2027     ! defined $err
2028       and
2029     ( !defined $rv or $sth->err )
2030   );
2031
2032   # Statement must finish even if there was an exception.
2033   try {
2034     $sth->finish
2035   }
2036   catch {
2037     $err = shift unless defined $err
2038   };
2039
2040   if (defined $err) {
2041     my $i = 0;
2042     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2043
2044     $self->throw_exception("Unexpected populate error: $err")
2045       if ($i > $#$tuple_status);
2046
2047     require Data::Dumper::Concise;
2048     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2049       ($tuple_status->[$i][1] || $err),
2050       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2051     );
2052   }
2053
2054   return $rv;
2055 }
2056
2057 sub _dbh_execute_inserts_with_no_binds {
2058   my ($self, $sth, $count) = @_;
2059
2060   my $err;
2061   try {
2062     my $dbh = $self->_get_dbh;
2063     local $dbh->{RaiseError} = 1;
2064     local $dbh->{PrintError} = 0;
2065
2066     $sth->execute foreach 1..$count;
2067   }
2068   catch {
2069     $err = shift;
2070   };
2071
2072   # Make sure statement is finished even if there was an exception.
2073   try {
2074     $sth->finish
2075   }
2076   catch {
2077     $err = shift unless defined $err;
2078   };
2079
2080   $self->throw_exception($err) if defined $err;
2081
2082   return $count;
2083 }
2084
2085 sub update {
2086   #my ($self, $source, @args) = @_;
2087   shift->_execute('update', @_);
2088 }
2089
2090
2091 sub delete {
2092   #my ($self, $source, @args) = @_;
2093   shift->_execute('delete', @_);
2094 }
2095
2096 # We were sent here because the $rs contains a complex search
2097 # which will require a subquery to select the correct rows
2098 # (i.e. joined or limited resultsets, or non-introspectable conditions)
2099 #
2100 # Generating a single PK column subquery is trivial and supported
2101 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
2102 # Look at _multipk_update_delete()
2103 sub _subq_update_delete {
2104   my $self = shift;
2105   my ($rs, $op, $values) = @_;
2106
2107   my $rsrc = $rs->result_source;
2108
2109   # quick check if we got a sane rs on our hands
2110   my @pcols = $rsrc->_pri_cols;
2111
2112   my $sel = $rs->_resolved_attrs->{select};
2113   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
2114
2115   if (
2116       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
2117         ne
2118       join ("\x00", sort @$sel )
2119   ) {
2120     $self->throw_exception (
2121       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
2122     );
2123   }
2124
2125   if (@pcols == 1) {
2126     return $self->$op (
2127       $rsrc,
2128       $op eq 'update' ? $values : (),
2129       { $pcols[0] => { -in => $rs->as_query } },
2130     );
2131   }
2132
2133   else {
2134     return $self->_multipk_update_delete (@_);
2135   }
2136 }
2137
2138 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2139 # resultset update/delete involving subqueries. So by default resort
2140 # to simple (and inefficient) delete_all style per-row opearations,
2141 # while allowing specific storages to override this with a faster
2142 # implementation.
2143 #
2144 sub _multipk_update_delete {
2145   return shift->_per_row_update_delete (@_);
2146 }
2147
2148 # This is the default loop used to delete/update rows for multi PK
2149 # resultsets, and used by mysql exclusively (because it can't do anything
2150 # else).
2151 #
2152 # We do not use $row->$op style queries, because resultset update/delete
2153 # is not expected to cascade (this is what delete_all/update_all is for).
2154 #
2155 # There should be no race conditions as the entire operation is rolled
2156 # in a transaction.
2157 #
2158 sub _per_row_update_delete {
2159   my $self = shift;
2160   my ($rs, $op, $values) = @_;
2161
2162   my $rsrc = $rs->result_source;
2163   my @pcols = $rsrc->_pri_cols;
2164
2165   my $guard = $self->txn_scope_guard;
2166
2167   # emulate the return value of $sth->execute for non-selects
2168   my $row_cnt = '0E0';
2169
2170   my $subrs_cur = $rs->cursor;
2171   my @all_pk = $subrs_cur->all;
2172   for my $pks ( @all_pk) {
2173
2174     my $cond;
2175     for my $i (0.. $#pcols) {
2176       $cond->{$pcols[$i]} = $pks->[$i];
2177     }
2178
2179     $self->$op (
2180       $rsrc,
2181       $op eq 'update' ? $values : (),
2182       $cond,
2183     );
2184
2185     $row_cnt++;
2186   }
2187
2188   $guard->commit;
2189
2190   return $row_cnt;
2191 }
2192
2193 sub _select {
2194   my $self = shift;
2195   $self->_execute($self->_select_args(@_));
2196 }
2197
2198 sub _select_args_to_query {
2199   my $self = shift;
2200
2201   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2202   #  = $self->_select_args($ident, $select, $cond, $attrs);
2203   my ($op, $ident, @args) =
2204     $self->_select_args(@_);
2205
2206   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2207   my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, \@args);
2208   $prepared_bind ||= [];
2209
2210   return wantarray
2211     ? ($sql, $prepared_bind)
2212     : \[ "($sql)", @$prepared_bind ]
2213   ;
2214 }
2215
2216 sub _select_args {
2217   my ($self, $ident, $select, $where, $attrs) = @_;
2218
2219   my $sql_maker = $self->sql_maker;
2220   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2221
2222   $attrs = {
2223     %$attrs,
2224     select => $select,
2225     from => $ident,
2226     where => $where,
2227     $rs_alias && $alias2source->{$rs_alias}
2228       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2229       : ()
2230     ,
2231   };
2232
2233   # Sanity check the attributes (SQLMaker does it too, but
2234   # in case of a software_limit we'll never reach there)
2235   if (defined $attrs->{offset}) {
2236     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2237       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2238   }
2239
2240   if (defined $attrs->{rows}) {
2241     $self->throw_exception("The rows attribute must be a positive integer if present")
2242       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2243   }
2244   elsif ($attrs->{offset}) {
2245     # MySQL actually recommends this approach.  I cringe.
2246     $attrs->{rows} = $sql_maker->__max_int;
2247   }
2248
2249   my @limit;
2250
2251   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2252   # storage, unless software limit was requested
2253   if (
2254     #limited has_many
2255     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2256        ||
2257     # grouped prefetch (to satisfy group_by == select)
2258     ( $attrs->{group_by}
2259         &&
2260       @{$attrs->{group_by}}
2261         &&
2262       $attrs->{_prefetch_selector_range}
2263     )
2264   ) {
2265     ($ident, $select, $where, $attrs)
2266       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2267   }
2268   elsif (! $attrs->{software_limit} ) {
2269     push @limit, (
2270       $attrs->{rows} || (),
2271       $attrs->{offset} || (),
2272     );
2273   }
2274
2275   # try to simplify the joinmap further (prune unreferenced type-single joins)
2276   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2277
2278 ###
2279   # This would be the point to deflate anything found in $where
2280   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2281   # expect a row object. And all we have is a resultsource (it is trivial
2282   # to extract deflator coderefs via $alias2source above).
2283   #
2284   # I don't see a way forward other than changing the way deflators are
2285   # invoked, and that's just bad...
2286 ###
2287
2288   return ('select', $ident, $select, $where, $attrs, @limit);
2289 }
2290
2291 # Returns a counting SELECT for a simple count
2292 # query. Abstracted so that a storage could override
2293 # this to { count => 'firstcol' } or whatever makes
2294 # sense as a performance optimization
2295 sub _count_select {
2296   #my ($self, $source, $rs_attrs) = @_;
2297   return { count => '*' };
2298 }
2299
2300 sub source_bind_attributes {
2301   shift->throw_exception(
2302     'source_bind_attributes() was never meant to be a callable public method - '
2303    .'please contact the DBIC dev-team and describe your use case so that a reasonable '
2304    .'solution can be provided'
2305    ."\nhttp://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT"
2306   );
2307 }
2308
2309 =head2 select
2310
2311 =over 4
2312
2313 =item Arguments: $ident, $select, $condition, $attrs
2314
2315 =back
2316
2317 Handle a SQL select statement.
2318
2319 =cut
2320
2321 sub select {
2322   my $self = shift;
2323   my ($ident, $select, $condition, $attrs) = @_;
2324   return $self->cursor_class->new($self, \@_, $attrs);
2325 }
2326
2327 sub select_single {
2328   my $self = shift;
2329   my ($rv, $sth, @bind) = $self->_select(@_);
2330   my @row = $sth->fetchrow_array;
2331   my @nextrow = $sth->fetchrow_array if @row;
2332   if(@row && @nextrow) {
2333     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2334   }
2335   # Need to call finish() to work round broken DBDs
2336   $sth->finish();
2337   return @row;
2338 }
2339
2340 =head2 sql_limit_dialect
2341
2342 This is an accessor for the default SQL limit dialect used by a particular
2343 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2344 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2345 see L<DBIx::Class::SQLMaker::LimitDialects>.
2346
2347 =cut
2348
2349 sub _dbh_sth {
2350   my ($self, $dbh, $sql) = @_;
2351
2352   # 3 is the if_active parameter which avoids active sth re-use
2353   my $sth = $self->disable_sth_caching
2354     ? $dbh->prepare($sql)
2355     : $dbh->prepare_cached($sql, {}, 3);
2356
2357   # XXX You would think RaiseError would make this impossible,
2358   #  but apparently that's not true :(
2359   $self->throw_exception(
2360     $dbh->errstr
2361       ||
2362     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2363             .'an exception and/or setting $dbh->errstr',
2364       length ($sql) > 20
2365         ? substr($sql, 0, 20) . '...'
2366         : $sql
2367       ,
2368       'DBD::' . $dbh->{Driver}{Name},
2369     )
2370   ) if !$sth;
2371
2372   $sth;
2373 }
2374
2375 sub sth {
2376   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2377   shift->_sth(@_);
2378 }
2379
2380 sub _sth {
2381   my ($self, $sql) = @_;
2382   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2383 }
2384
2385 sub _dbh_columns_info_for {
2386   my ($self, $dbh, $table) = @_;
2387
2388   if ($dbh->can('column_info')) {
2389     my %result;
2390     my $caught;
2391     try {
2392       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2393       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2394       $sth->execute();
2395       while ( my $info = $sth->fetchrow_hashref() ){
2396         my %column_info;
2397         $column_info{data_type}   = $info->{TYPE_NAME};
2398         $column_info{size}      = $info->{COLUMN_SIZE};
2399         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2400         $column_info{default_value} = $info->{COLUMN_DEF};
2401         my $col_name = $info->{COLUMN_NAME};
2402         $col_name =~ s/^\"(.*)\"$/$1/;
2403
2404         $result{$col_name} = \%column_info;
2405       }
2406     } catch {
2407       $caught = 1;
2408     };
2409     return \%result if !$caught && scalar keys %result;
2410   }
2411
2412   my %result;
2413   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2414   $sth->execute;
2415   my @columns = @{$sth->{NAME_lc}};
2416   for my $i ( 0 .. $#columns ){
2417     my %column_info;
2418     $column_info{data_type} = $sth->{TYPE}->[$i];
2419     $column_info{size} = $sth->{PRECISION}->[$i];
2420     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2421
2422     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2423       $column_info{data_type} = $1;
2424       $column_info{size}    = $2;
2425     }
2426
2427     $result{$columns[$i]} = \%column_info;
2428   }
2429   $sth->finish;
2430
2431   foreach my $col (keys %result) {
2432     my $colinfo = $result{$col};
2433     my $type_num = $colinfo->{data_type};
2434     my $type_name;
2435     if(defined $type_num && $dbh->can('type_info')) {
2436       my $type_info = $dbh->type_info($type_num);
2437       $type_name = $type_info->{TYPE_NAME} if $type_info;
2438       $colinfo->{data_type} = $type_name if $type_name;
2439     }
2440   }
2441
2442   return \%result;
2443 }
2444
2445 sub columns_info_for {
2446   my ($self, $table) = @_;
2447   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2448 }
2449
2450 =head2 last_insert_id
2451
2452 Return the row id of the last insert.
2453
2454 =cut
2455
2456 sub _dbh_last_insert_id {
2457     my ($self, $dbh, $source, $col) = @_;
2458
2459     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2460
2461     return $id if defined $id;
2462
2463     my $class = ref $self;
2464     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2465 }
2466
2467 sub last_insert_id {
2468   my $self = shift;
2469   $self->_dbh_last_insert_id ($self->_dbh, @_);
2470 }
2471
2472 =head2 _native_data_type
2473
2474 =over 4
2475
2476 =item Arguments: $type_name
2477
2478 =back
2479
2480 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2481 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2482 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2483
2484 The default implementation returns C<undef>, implement in your Storage driver if
2485 you need this functionality.
2486
2487 Should map types from other databases to the native RDBMS type, for example
2488 C<VARCHAR2> to C<VARCHAR>.
2489
2490 Types with modifiers should map to the underlying data type. For example,
2491 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2492
2493 Composite types should map to the container type, for example
2494 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2495
2496 =cut
2497
2498 sub _native_data_type {
2499   #my ($self, $data_type) = @_;
2500   return undef
2501 }
2502
2503 # Check if placeholders are supported at all
2504 sub _determine_supports_placeholders {
2505   my $self = shift;
2506   my $dbh  = $self->_get_dbh;
2507
2508   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2509   # but it is inaccurate more often than not
2510   return try {
2511     local $dbh->{PrintError} = 0;
2512     local $dbh->{RaiseError} = 1;
2513     $dbh->do('select ?', {}, 1);
2514     1;
2515   }
2516   catch {
2517     0;
2518   };
2519 }
2520
2521 # Check if placeholders bound to non-string types throw exceptions
2522 #
2523 sub _determine_supports_typeless_placeholders {
2524   my $self = shift;
2525   my $dbh  = $self->_get_dbh;
2526
2527   return try {
2528     local $dbh->{PrintError} = 0;
2529     local $dbh->{RaiseError} = 1;
2530     # this specifically tests a bind that is NOT a string
2531     $dbh->do('select 1 where 1 = ?', {}, 1);
2532     1;
2533   }
2534   catch {
2535     0;
2536   };
2537 }
2538
2539 =head2 sqlt_type
2540
2541 Returns the database driver name.
2542
2543 =cut
2544
2545 sub sqlt_type {
2546   shift->_get_dbh->{Driver}->{Name};
2547 }
2548
2549 =head2 bind_attribute_by_data_type
2550
2551 Given a datatype from column info, returns a database specific bind
2552 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2553 let the database planner just handle it.
2554
2555 Generally only needed for special case column types, like bytea in postgres.
2556
2557 =cut
2558
2559 sub bind_attribute_by_data_type {
2560     return;
2561 }
2562
2563 =head2 is_datatype_numeric
2564
2565 Given a datatype from column_info, returns a boolean value indicating if
2566 the current RDBMS considers it a numeric value. This controls how
2567 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2568 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2569 be performed instead of the usual C<eq>.
2570
2571 =cut
2572
2573 sub is_datatype_numeric {
2574   #my ($self, $dt) = @_;
2575
2576   return 0 unless $_[1];
2577
2578   $_[1] =~ /^ (?:
2579     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2580   ) $/ix;
2581 }
2582
2583
2584 =head2 create_ddl_dir
2585
2586 =over 4
2587
2588 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2589
2590 =back
2591
2592 Creates a SQL file based on the Schema, for each of the specified
2593 database engines in C<\@databases> in the given directory.
2594 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2595
2596 Given a previous version number, this will also create a file containing
2597 the ALTER TABLE statements to transform the previous schema into the
2598 current one. Note that these statements may contain C<DROP TABLE> or
2599 C<DROP COLUMN> statements that can potentially destroy data.
2600
2601 The file names are created using the C<ddl_filename> method below, please
2602 override this method in your schema if you would like a different file
2603 name format. For the ALTER file, the same format is used, replacing
2604 $version in the name with "$preversion-$version".
2605
2606 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2607 The most common value for this would be C<< { add_drop_table => 1 } >>
2608 to have the SQL produced include a C<DROP TABLE> statement for each table
2609 created. For quoting purposes supply C<quote_table_names> and
2610 C<quote_field_names>.
2611
2612 If no arguments are passed, then the following default values are assumed:
2613
2614 =over 4
2615
2616 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2617
2618 =item version    - $schema->schema_version
2619
2620 =item directory  - './'
2621
2622 =item preversion - <none>
2623
2624 =back
2625
2626 By default, C<\%sqlt_args> will have
2627
2628  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2629
2630 merged with the hash passed in. To disable any of those features, pass in a
2631 hashref like the following
2632
2633  { ignore_constraint_names => 0, # ... other options }
2634
2635
2636 WARNING: You are strongly advised to check all SQL files created, before applying
2637 them.
2638
2639 =cut
2640
2641 sub create_ddl_dir {
2642   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2643
2644   unless ($dir) {
2645     carp "No directory given, using ./\n";
2646     $dir = './';
2647   } else {
2648       -d $dir
2649         or
2650       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2651         or
2652       $self->throw_exception(
2653         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2654       );
2655   }
2656
2657   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2658
2659   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2660   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2661
2662   my $schema_version = $schema->schema_version || '1.x';
2663   $version ||= $schema_version;
2664
2665   $sqltargs = {
2666     add_drop_table => 1,
2667     ignore_constraint_names => 1,
2668     ignore_index_names => 1,
2669     %{$sqltargs || {}}
2670   };
2671
2672   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2673     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2674   }
2675
2676   my $sqlt = SQL::Translator->new( $sqltargs );
2677
2678   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2679   my $sqlt_schema = $sqlt->translate({ data => $schema })
2680     or $self->throw_exception ($sqlt->error);
2681
2682   foreach my $db (@$databases) {
2683     $sqlt->reset();
2684     $sqlt->{schema} = $sqlt_schema;
2685     $sqlt->producer($db);
2686
2687     my $file;
2688     my $filename = $schema->ddl_filename($db, $version, $dir);
2689     if (-e $filename && ($version eq $schema_version )) {
2690       # if we are dumping the current version, overwrite the DDL
2691       carp "Overwriting existing DDL file - $filename";
2692       unlink($filename);
2693     }
2694
2695     my $output = $sqlt->translate;
2696     if(!$output) {
2697       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2698       next;
2699     }
2700     if(!open($file, ">$filename")) {
2701       $self->throw_exception("Can't open $filename for writing ($!)");
2702       next;
2703     }
2704     print $file $output;
2705     close($file);
2706
2707     next unless ($preversion);
2708
2709     require SQL::Translator::Diff;
2710
2711     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2712     if(!-e $prefilename) {
2713       carp("No previous schema file found ($prefilename)");
2714       next;
2715     }
2716
2717     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2718     if(-e $difffile) {
2719       carp("Overwriting existing diff file - $difffile");
2720       unlink($difffile);
2721     }
2722
2723     my $source_schema;
2724     {
2725       my $t = SQL::Translator->new($sqltargs);
2726       $t->debug( 0 );
2727       $t->trace( 0 );
2728
2729       $t->parser( $db )
2730         or $self->throw_exception ($t->error);
2731
2732       my $out = $t->translate( $prefilename )
2733         or $self->throw_exception ($t->error);
2734
2735       $source_schema = $t->schema;
2736
2737       $source_schema->name( $prefilename )
2738         unless ( $source_schema->name );
2739     }
2740
2741     # The "new" style of producers have sane normalization and can support
2742     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2743     # And we have to diff parsed SQL against parsed SQL.
2744     my $dest_schema = $sqlt_schema;
2745
2746     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2747       my $t = SQL::Translator->new($sqltargs);
2748       $t->debug( 0 );
2749       $t->trace( 0 );
2750
2751       $t->parser( $db )
2752         or $self->throw_exception ($t->error);
2753
2754       my $out = $t->translate( $filename )
2755         or $self->throw_exception ($t->error);
2756
2757       $dest_schema = $t->schema;
2758
2759       $dest_schema->name( $filename )
2760         unless $dest_schema->name;
2761     }
2762
2763     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2764                                                   $dest_schema,   $db,
2765                                                   $sqltargs
2766                                                  );
2767     if(!open $file, ">$difffile") {
2768       $self->throw_exception("Can't write to $difffile ($!)");
2769       next;
2770     }
2771     print $file $diff;
2772     close($file);
2773   }
2774 }
2775
2776 =head2 deployment_statements
2777
2778 =over 4
2779
2780 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2781
2782 =back
2783
2784 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2785
2786 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2787 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2788
2789 C<$directory> is used to return statements from files in a previously created
2790 L</create_ddl_dir> directory and is optional. The filenames are constructed
2791 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2792
2793 If no C<$directory> is specified then the statements are constructed on the
2794 fly using L<SQL::Translator> and C<$version> is ignored.
2795
2796 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2797
2798 =cut
2799
2800 sub deployment_statements {
2801   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2802   $type ||= $self->sqlt_type;
2803   $version ||= $schema->schema_version || '1.x';
2804   $dir ||= './';
2805   my $filename = $schema->ddl_filename($type, $version, $dir);
2806   if(-f $filename)
2807   {
2808       # FIXME replace this block when a proper sane sql parser is available
2809       my $file;
2810       open($file, "<$filename")
2811         or $self->throw_exception("Can't open $filename ($!)");
2812       my @rows = <$file>;
2813       close($file);
2814       return join('', @rows);
2815   }
2816
2817   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2818     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2819   }
2820
2821   # sources needs to be a parser arg, but for simplicty allow at top level
2822   # coming in
2823   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2824       if exists $sqltargs->{sources};
2825
2826   my $tr = SQL::Translator->new(
2827     producer => "SQL::Translator::Producer::${type}",
2828     %$sqltargs,
2829     parser => 'SQL::Translator::Parser::DBIx::Class',
2830     data => $schema,
2831   );
2832
2833   my @ret;
2834   if (wantarray) {
2835     @ret = $tr->translate;
2836   }
2837   else {
2838     $ret[0] = $tr->translate;
2839   }
2840
2841   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2842     unless (@ret && defined $ret[0]);
2843
2844   return wantarray ? @ret : $ret[0];
2845 }
2846
2847 # FIXME deploy() currently does not accurately report sql errors
2848 # Will always return true while errors are warned
2849 sub deploy {
2850   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2851   my $deploy = sub {
2852     my $line = shift;
2853     return if(!$line);
2854     return if($line =~ /^--/);
2855     # next if($line =~ /^DROP/m);
2856     return if($line =~ /^BEGIN TRANSACTION/m);
2857     return if($line =~ /^COMMIT/m);
2858     return if $line =~ /^\s+$/; # skip whitespace only
2859     $self->_query_start($line);
2860     try {
2861       # do a dbh_do cycle here, as we need some error checking in
2862       # place (even though we will ignore errors)
2863       $self->dbh_do (sub { $_[1]->do($line) });
2864     } catch {
2865       carp qq{$_ (running "${line}")};
2866     };
2867     $self->_query_end($line);
2868   };
2869   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2870   if (@statements > 1) {
2871     foreach my $statement (@statements) {
2872       $deploy->( $statement );
2873     }
2874   }
2875   elsif (@statements == 1) {
2876     # split on single line comments and end of statements
2877     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2878       $deploy->( $line );
2879     }
2880   }
2881 }
2882
2883 =head2 datetime_parser
2884
2885 Returns the datetime parser class
2886
2887 =cut
2888
2889 sub datetime_parser {
2890   my $self = shift;
2891   return $self->{datetime_parser} ||= do {
2892     $self->build_datetime_parser(@_);
2893   };
2894 }
2895
2896 =head2 datetime_parser_type
2897
2898 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2899
2900 =head2 build_datetime_parser
2901
2902 See L</datetime_parser>
2903
2904 =cut
2905
2906 sub build_datetime_parser {
2907   my $self = shift;
2908   my $type = $self->datetime_parser_type(@_);
2909   return $type;
2910 }
2911
2912
2913 =head2 is_replicating
2914
2915 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2916 replicate from a master database.  Default is undef, which is the result
2917 returned by databases that don't support replication.
2918
2919 =cut
2920
2921 sub is_replicating {
2922     return;
2923
2924 }
2925
2926 =head2 lag_behind_master
2927
2928 Returns a number that represents a certain amount of lag behind a master db
2929 when a given storage is replicating.  The number is database dependent, but
2930 starts at zero and increases with the amount of lag. Default in undef
2931
2932 =cut
2933
2934 sub lag_behind_master {
2935     return;
2936 }
2937
2938 =head2 relname_to_table_alias
2939
2940 =over 4
2941
2942 =item Arguments: $relname, $join_count
2943
2944 =back
2945
2946 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2947 queries.
2948
2949 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2950 way these aliases are named.
2951
2952 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2953 otherwise C<"$relname">.
2954
2955 =cut
2956
2957 sub relname_to_table_alias {
2958   my ($self, $relname, $join_count) = @_;
2959
2960   my $alias = ($join_count && $join_count > 1 ?
2961     join('_', $relname, $join_count) : $relname);
2962
2963   return $alias;
2964 }
2965
2966 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2967 # version and it may be necessary to amend or override it for a specific storage
2968 # if such binds are necessary.
2969 sub _max_column_bytesize {
2970   my ($self, $attr) = @_;
2971
2972   my $max_size;
2973
2974   if ($attr->{sqlt_datatype}) {
2975     my $data_type = lc($attr->{sqlt_datatype});
2976
2977     if ($attr->{sqlt_size}) {
2978
2979       # String/sized-binary types
2980       if ($data_type =~ /^(?:
2981           l? (?:var)? char(?:acter)? (?:\s*varying)?
2982             |
2983           (?:var)? binary (?:\s*varying)?
2984             |
2985           raw
2986         )\b/x
2987       ) {
2988         $max_size = $attr->{sqlt_size};
2989       }
2990       # Other charset/unicode types, assume scale of 4
2991       elsif ($data_type =~ /^(?:
2992           national \s* character (?:\s*varying)?
2993             |
2994           nchar
2995             |
2996           univarchar
2997             |
2998           nvarchar
2999         )\b/x
3000       ) {
3001         $max_size = $attr->{sqlt_size} * 4;
3002       }
3003     }
3004
3005     if (!$max_size and !$self->_is_lob_type($data_type)) {
3006       $max_size = 100 # for all other (numeric?) datatypes
3007     }
3008   }
3009
3010   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
3011 }
3012
3013 # Determine if a data_type is some type of BLOB
3014 sub _is_lob_type {
3015   my ($self, $data_type) = @_;
3016   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
3017     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
3018                                   |varchar|character\s*varying|nvarchar
3019                                   |national\s*character\s*varying))?\z/xi);
3020 }
3021
3022 sub _is_binary_lob_type {
3023   my ($self, $data_type) = @_;
3024   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
3025     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
3026 }
3027
3028 sub _is_text_lob_type {
3029   my ($self, $data_type) = @_;
3030   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
3031     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
3032                         |national\s*character\s*varying))\z/xi);
3033 }
3034
3035 1;
3036
3037 =head1 USAGE NOTES
3038
3039 =head2 DBIx::Class and AutoCommit
3040
3041 DBIx::Class can do some wonderful magic with handling exceptions,
3042 disconnections, and transactions when you use C<< AutoCommit => 1 >>
3043 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
3044 transaction support.
3045
3046 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
3047 in an assumed transaction between commits, and you're telling us you'd
3048 like to manage that manually.  A lot of the magic protections offered by
3049 this module will go away.  We can't protect you from exceptions due to database
3050 disconnects because we don't know anything about how to restart your
3051 transactions.  You're on your own for handling all sorts of exceptional
3052 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
3053 be with raw DBI.
3054
3055
3056 =head1 AUTHORS
3057
3058 Matt S. Trout <mst@shadowcatsystems.co.uk>
3059
3060 Andy Grundman <andy@hybridized.org>
3061
3062 =head1 LICENSE
3063
3064 You may distribute this code under the same terms as Perl itself.
3065
3066 =cut