Add retrieve_on_insert column_info flag, to autoretrieve RDBMS-side defaults
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Try::Tiny;
16 use overload ();
17 use namespace::clean;
18
19 # default cursor class, overridable in connect_info attributes
20 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
21
22 __PACKAGE__->mk_group_accessors('inherited' => qw/
23   sql_limit_dialect sql_quote_char sql_name_sep
24 /);
25
26 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
27
28 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
29 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
30
31 __PACKAGE__->sql_name_sep('.');
32
33 __PACKAGE__->mk_group_accessors('simple' => qw/
34   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
35   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
36   transaction_depth _dbh_autocommit  savepoints
37 /);
38
39 # the values for these accessors are picked out (and deleted) from
40 # the attribute hashref passed to connect_info
41 my @storage_options = qw/
42   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
43   disable_sth_caching unsafe auto_savepoint
44 /;
45 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
46
47
48 # capability definitions, using a 2-tiered accessor system
49 # The rationale is:
50 #
51 # A driver/user may define _use_X, which blindly without any checks says:
52 # "(do not) use this capability", (use_dbms_capability is an "inherited"
53 # type accessor)
54 #
55 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
56 # accessor, which in turn calls _determine_supports_X, and stores the return
57 # in a special slot on the storage object, which is wiped every time a $dbh
58 # reconnection takes place (it is not guaranteed that upon reconnection we
59 # will get the same rdbms version). _determine_supports_X does not need to
60 # exist on a driver, as we ->can for it before calling.
61
62 my @capabilities = (qw/
63   insert_returning
64   insert_returning_bound
65   placeholders
66   typeless_placeholders
67   join_optimizer
68 /);
69 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
70 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
71
72 # on by default, not strictly a capability (pending rewrite)
73 __PACKAGE__->_use_join_optimizer (1);
74 sub _determine_supports_join_optimizer { 1 };
75
76 # Each of these methods need _determine_driver called before itself
77 # in order to function reliably. This is a purely DRY optimization
78 #
79 # get_(use)_dbms_capability need to be called on the correct Storage
80 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
81 # _determine_supports_X which obv. needs a correct driver as well
82 my @rdbms_specific_methods = qw/
83   deployment_statements
84   sqlt_type
85   sql_maker
86   build_datetime_parser
87   datetime_parser_type
88
89   insert
90   insert_bulk
91   update
92   delete
93   select
94   select_single
95
96   get_use_dbms_capability
97   get_dbms_capability
98
99   _server_info
100   _get_server_version
101 /;
102
103 for my $meth (@rdbms_specific_methods) {
104
105   my $orig = __PACKAGE__->can ($meth)
106     or die "$meth is not a ::Storage::DBI method!";
107
108   no strict qw/refs/;
109   no warnings qw/redefine/;
110   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
111     if (
112       # only fire when invoked on an instance, a valid class-based invocation
113       # would e.g. be setting a default for an inherited accessor
114       ref $_[0]
115         and
116       ! $_[0]->_driver_determined
117         and
118       ! $_[0]->{_in_determine_driver}
119     ) {
120       $_[0]->_determine_driver;
121
122       # This for some reason crashes and burns on perl 5.8.1
123       # IFF the method ends up throwing an exception
124       #goto $_[0]->can ($meth);
125
126       my $cref = $_[0]->can ($meth);
127       goto $cref;
128     }
129
130     goto $orig;
131   };
132 }
133
134
135 =head1 NAME
136
137 DBIx::Class::Storage::DBI - DBI storage handler
138
139 =head1 SYNOPSIS
140
141   my $schema = MySchema->connect('dbi:SQLite:my.db');
142
143   $schema->storage->debug(1);
144
145   my @stuff = $schema->storage->dbh_do(
146     sub {
147       my ($storage, $dbh, @args) = @_;
148       $dbh->do("DROP TABLE authors");
149     },
150     @column_list
151   );
152
153   $schema->resultset('Book')->search({
154      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
155   });
156
157 =head1 DESCRIPTION
158
159 This class represents the connection to an RDBMS via L<DBI>.  See
160 L<DBIx::Class::Storage> for general information.  This pod only
161 documents DBI-specific methods and behaviors.
162
163 =head1 METHODS
164
165 =cut
166
167 sub new {
168   my $new = shift->next::method(@_);
169
170   $new->transaction_depth(0);
171   $new->_sql_maker_opts({});
172   $new->_dbh_details({});
173   $new->{savepoints} = [];
174   $new->{_in_dbh_do} = 0;
175   $new->{_dbh_gen} = 0;
176
177   # read below to see what this does
178   $new->_arm_global_destructor;
179
180   $new;
181 }
182
183 # This is hack to work around perl shooting stuff in random
184 # order on exit(). If we do not walk the remaining storage
185 # objects in an END block, there is a *small but real* chance
186 # of a fork()ed child to kill the parent's shared DBI handle,
187 # *before perl reaches the DESTROY in this package*
188 # Yes, it is ugly and effective.
189 # Additionally this registry is used by the CLONE method to
190 # make sure no handles are shared between threads
191 {
192   my %seek_and_destroy;
193
194   sub _arm_global_destructor {
195     my $self = shift;
196     my $key = refaddr ($self);
197     $seek_and_destroy{$key} = $self;
198     weaken ($seek_and_destroy{$key});
199   }
200
201   END {
202     local $?; # just in case the DBI destructor changes it somehow
203
204     # destroy just the object if not native to this process/thread
205     $_->_verify_pid for (grep
206       { defined $_ }
207       values %seek_and_destroy
208     );
209   }
210
211   sub CLONE {
212     # As per DBI's recommendation, DBIC disconnects all handles as
213     # soon as possible (DBIC will reconnect only on demand from within
214     # the thread)
215     for (values %seek_and_destroy) {
216       next unless $_;
217       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
218       $_->_dbh(undef);
219     }
220   }
221 }
222
223 sub DESTROY {
224   my $self = shift;
225
226   # some databases spew warnings on implicit disconnect
227   local $SIG{__WARN__} = sub {};
228   $self->_dbh(undef);
229
230   # this op is necessary, since the very last perl runtime statement
231   # triggers a global destruction shootout, and the $SIG localization
232   # may very well be destroyed before perl actually gets to do the
233   # $dbh undef
234   1;
235 }
236
237 # handle pid changes correctly - do not destroy parent's connection
238 sub _verify_pid {
239   my $self = shift;
240
241   my $pid = $self->_conn_pid;
242   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
243     $dbh->{InactiveDestroy} = 1;
244     $self->{_dbh_gen}++;
245     $self->_dbh(undef);
246   }
247
248   return;
249 }
250
251 =head2 connect_info
252
253 This method is normally called by L<DBIx::Class::Schema/connection>, which
254 encapsulates its argument list in an arrayref before passing them here.
255
256 The argument list may contain:
257
258 =over
259
260 =item *
261
262 The same 4-element argument set one would normally pass to
263 L<DBI/connect>, optionally followed by
264 L<extra attributes|/DBIx::Class specific connection attributes>
265 recognized by DBIx::Class:
266
267   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
268
269 =item *
270
271 A single code reference which returns a connected
272 L<DBI database handle|DBI/connect> optionally followed by
273 L<extra attributes|/DBIx::Class specific connection attributes> recognized
274 by DBIx::Class:
275
276   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
277
278 =item *
279
280 A single hashref with all the attributes and the dsn/user/password
281 mixed together:
282
283   $connect_info_args = [{
284     dsn => $dsn,
285     user => $user,
286     password => $pass,
287     %dbi_attributes,
288     %extra_attributes,
289   }];
290
291   $connect_info_args = [{
292     dbh_maker => sub { DBI->connect (...) },
293     %dbi_attributes,
294     %extra_attributes,
295   }];
296
297 This is particularly useful for L<Catalyst> based applications, allowing the
298 following config (L<Config::General> style):
299
300   <Model::DB>
301     schema_class   App::DB
302     <connect_info>
303       dsn          dbi:mysql:database=test
304       user         testuser
305       password     TestPass
306       AutoCommit   1
307     </connect_info>
308   </Model::DB>
309
310 The C<dsn>/C<user>/C<password> combination can be substituted by the
311 C<dbh_maker> key whose value is a coderef that returns a connected
312 L<DBI database handle|DBI/connect>
313
314 =back
315
316 Please note that the L<DBI> docs recommend that you always explicitly
317 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
318 recommends that it be set to I<1>, and that you perform transactions
319 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
320 to I<1> if you do not do explicitly set it to zero.  This is the default
321 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
322
323 =head3 DBIx::Class specific connection attributes
324
325 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
326 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
327 the following connection options. These options can be mixed in with your other
328 L<DBI> connection attributes, or placed in a separate hashref
329 (C<\%extra_attributes>) as shown above.
330
331 Every time C<connect_info> is invoked, any previous settings for
332 these options will be cleared before setting the new ones, regardless of
333 whether any options are specified in the new C<connect_info>.
334
335
336 =over
337
338 =item on_connect_do
339
340 Specifies things to do immediately after connecting or re-connecting to
341 the database.  Its value may contain:
342
343 =over
344
345 =item a scalar
346
347 This contains one SQL statement to execute.
348
349 =item an array reference
350
351 This contains SQL statements to execute in order.  Each element contains
352 a string or a code reference that returns a string.
353
354 =item a code reference
355
356 This contains some code to execute.  Unlike code references within an
357 array reference, its return value is ignored.
358
359 =back
360
361 =item on_disconnect_do
362
363 Takes arguments in the same form as L</on_connect_do> and executes them
364 immediately before disconnecting from the database.
365
366 Note, this only runs if you explicitly call L</disconnect> on the
367 storage object.
368
369 =item on_connect_call
370
371 A more generalized form of L</on_connect_do> that calls the specified
372 C<connect_call_METHOD> methods in your storage driver.
373
374   on_connect_do => 'select 1'
375
376 is equivalent to:
377
378   on_connect_call => [ [ do_sql => 'select 1' ] ]
379
380 Its values may contain:
381
382 =over
383
384 =item a scalar
385
386 Will call the C<connect_call_METHOD> method.
387
388 =item a code reference
389
390 Will execute C<< $code->($storage) >>
391
392 =item an array reference
393
394 Each value can be a method name or code reference.
395
396 =item an array of arrays
397
398 For each array, the first item is taken to be the C<connect_call_> method name
399 or code reference, and the rest are parameters to it.
400
401 =back
402
403 Some predefined storage methods you may use:
404
405 =over
406
407 =item do_sql
408
409 Executes a SQL string or a code reference that returns a SQL string. This is
410 what L</on_connect_do> and L</on_disconnect_do> use.
411
412 It can take:
413
414 =over
415
416 =item a scalar
417
418 Will execute the scalar as SQL.
419
420 =item an arrayref
421
422 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
423 attributes hashref and bind values.
424
425 =item a code reference
426
427 Will execute C<< $code->($storage) >> and execute the return array refs as
428 above.
429
430 =back
431
432 =item datetime_setup
433
434 Execute any statements necessary to initialize the database session to return
435 and accept datetime/timestamp values used with
436 L<DBIx::Class::InflateColumn::DateTime>.
437
438 Only necessary for some databases, see your specific storage driver for
439 implementation details.
440
441 =back
442
443 =item on_disconnect_call
444
445 Takes arguments in the same form as L</on_connect_call> and executes them
446 immediately before disconnecting from the database.
447
448 Calls the C<disconnect_call_METHOD> methods as opposed to the
449 C<connect_call_METHOD> methods called by L</on_connect_call>.
450
451 Note, this only runs if you explicitly call L</disconnect> on the
452 storage object.
453
454 =item disable_sth_caching
455
456 If set to a true value, this option will disable the caching of
457 statement handles via L<DBI/prepare_cached>.
458
459 =item limit_dialect
460
461 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
462 default L</sql_limit_dialect> setting of the storage (if any). For a list
463 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
464
465 =item quote_names
466
467 When true automatically sets L</quote_char> and L</name_sep> to the characters
468 appropriate for your particular RDBMS. This option is preferred over specifying
469 L</quote_char> directly.
470
471 =item quote_char
472
473 Specifies what characters to use to quote table and column names.
474
475 C<quote_char> expects either a single character, in which case is it
476 is placed on either side of the table/column name, or an arrayref of length
477 2 in which case the table/column name is placed between the elements.
478
479 For example under MySQL you should use C<< quote_char => '`' >>, and for
480 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
481
482 =item name_sep
483
484 This parameter is only useful in conjunction with C<quote_char>, and is used to
485 specify the character that separates elements (schemas, tables, columns) from
486 each other. If unspecified it defaults to the most commonly used C<.>.
487
488 =item unsafe
489
490 This Storage driver normally installs its own C<HandleError>, sets
491 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
492 all database handles, including those supplied by a coderef.  It does this
493 so that it can have consistent and useful error behavior.
494
495 If you set this option to a true value, Storage will not do its usual
496 modifications to the database handle's attributes, and instead relies on
497 the settings in your connect_info DBI options (or the values you set in
498 your connection coderef, in the case that you are connecting via coderef).
499
500 Note that your custom settings can cause Storage to malfunction,
501 especially if you set a C<HandleError> handler that suppresses exceptions
502 and/or disable C<RaiseError>.
503
504 =item auto_savepoint
505
506 If this option is true, L<DBIx::Class> will use savepoints when nesting
507 transactions, making it possible to recover from failure in the inner
508 transaction without having to abort all outer transactions.
509
510 =item cursor_class
511
512 Use this argument to supply a cursor class other than the default
513 L<DBIx::Class::Storage::DBI::Cursor>.
514
515 =back
516
517 Some real-life examples of arguments to L</connect_info> and
518 L<DBIx::Class::Schema/connect>
519
520   # Simple SQLite connection
521   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
522
523   # Connect via subref
524   ->connect_info([ sub { DBI->connect(...) } ]);
525
526   # Connect via subref in hashref
527   ->connect_info([{
528     dbh_maker => sub { DBI->connect(...) },
529     on_connect_do => 'alter session ...',
530   }]);
531
532   # A bit more complicated
533   ->connect_info(
534     [
535       'dbi:Pg:dbname=foo',
536       'postgres',
537       'my_pg_password',
538       { AutoCommit => 1 },
539       { quote_char => q{"} },
540     ]
541   );
542
543   # Equivalent to the previous example
544   ->connect_info(
545     [
546       'dbi:Pg:dbname=foo',
547       'postgres',
548       'my_pg_password',
549       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
550     ]
551   );
552
553   # Same, but with hashref as argument
554   # See parse_connect_info for explanation
555   ->connect_info(
556     [{
557       dsn         => 'dbi:Pg:dbname=foo',
558       user        => 'postgres',
559       password    => 'my_pg_password',
560       AutoCommit  => 1,
561       quote_char  => q{"},
562       name_sep    => q{.},
563     }]
564   );
565
566   # Subref + DBIx::Class-specific connection options
567   ->connect_info(
568     [
569       sub { DBI->connect(...) },
570       {
571           quote_char => q{`},
572           name_sep => q{@},
573           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
574           disable_sth_caching => 1,
575       },
576     ]
577   );
578
579
580
581 =cut
582
583 sub connect_info {
584   my ($self, $info) = @_;
585
586   return $self->_connect_info if !$info;
587
588   $self->_connect_info($info); # copy for _connect_info
589
590   $info = $self->_normalize_connect_info($info)
591     if ref $info eq 'ARRAY';
592
593   for my $storage_opt (keys %{ $info->{storage_options} }) {
594     my $value = $info->{storage_options}{$storage_opt};
595
596     $self->$storage_opt($value);
597   }
598
599   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
600   #  the new set of options
601   $self->_sql_maker(undef);
602   $self->_sql_maker_opts({});
603
604   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
605     my $value = $info->{sql_maker_options}{$sql_maker_opt};
606
607     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
608   }
609
610   my %attrs = (
611     %{ $self->_default_dbi_connect_attributes || {} },
612     %{ $info->{attributes} || {} },
613   );
614
615   my @args = @{ $info->{arguments} };
616
617   if (keys %attrs and ref $args[0] ne 'CODE') {
618     carp
619         'You provided explicit AutoCommit => 0 in your connection_info. '
620       . 'This is almost universally a bad idea (see the footnotes of '
621       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
622       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
623       . 'this warning.'
624       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
625
626     push @args, \%attrs if keys %attrs;
627   }
628   $self->_dbi_connect_info(\@args);
629
630   # FIXME - dirty:
631   # save attributes them in a separate accessor so they are always
632   # introspectable, even in case of a CODE $dbhmaker
633   $self->_dbic_connect_attributes (\%attrs);
634
635   return $self->_connect_info;
636 }
637
638 sub _normalize_connect_info {
639   my ($self, $info_arg) = @_;
640   my %info;
641
642   my @args = @$info_arg;  # take a shallow copy for further mutilation
643
644   # combine/pre-parse arguments depending on invocation style
645
646   my %attrs;
647   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
648     %attrs = %{ $args[1] || {} };
649     @args = $args[0];
650   }
651   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
652     %attrs = %{$args[0]};
653     @args = ();
654     if (my $code = delete $attrs{dbh_maker}) {
655       @args = $code;
656
657       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
658       if (@ignored) {
659         carp sprintf (
660             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
661           . "to the result of 'dbh_maker'",
662
663           join (', ', map { "'$_'" } (@ignored) ),
664         );
665       }
666     }
667     else {
668       @args = delete @attrs{qw/dsn user password/};
669     }
670   }
671   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
672     %attrs = (
673       % { $args[3] || {} },
674       % { $args[4] || {} },
675     );
676     @args = @args[0,1,2];
677   }
678
679   $info{arguments} = \@args;
680
681   my @storage_opts = grep exists $attrs{$_},
682     @storage_options, 'cursor_class';
683
684   @{ $info{storage_options} }{@storage_opts} =
685     delete @attrs{@storage_opts} if @storage_opts;
686
687   my @sql_maker_opts = grep exists $attrs{$_},
688     qw/limit_dialect quote_char name_sep quote_names/;
689
690   @{ $info{sql_maker_options} }{@sql_maker_opts} =
691     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
692
693   $info{attributes} = \%attrs if %attrs;
694
695   return \%info;
696 }
697
698 sub _default_dbi_connect_attributes () {
699   +{
700     AutoCommit => 1,
701     PrintError => 0,
702     RaiseError => 1,
703     ShowErrorStatement => 1,
704   };
705 }
706
707 =head2 on_connect_do
708
709 This method is deprecated in favour of setting via L</connect_info>.
710
711 =cut
712
713 =head2 on_disconnect_do
714
715 This method is deprecated in favour of setting via L</connect_info>.
716
717 =cut
718
719 sub _parse_connect_do {
720   my ($self, $type) = @_;
721
722   my $val = $self->$type;
723   return () if not defined $val;
724
725   my @res;
726
727   if (not ref($val)) {
728     push @res, [ 'do_sql', $val ];
729   } elsif (ref($val) eq 'CODE') {
730     push @res, $val;
731   } elsif (ref($val) eq 'ARRAY') {
732     push @res, map { [ 'do_sql', $_ ] } @$val;
733   } else {
734     $self->throw_exception("Invalid type for $type: ".ref($val));
735   }
736
737   return \@res;
738 }
739
740 =head2 dbh_do
741
742 Arguments: ($subref | $method_name), @extra_coderef_args?
743
744 Execute the given $subref or $method_name using the new exception-based
745 connection management.
746
747 The first two arguments will be the storage object that C<dbh_do> was called
748 on and a database handle to use.  Any additional arguments will be passed
749 verbatim to the called subref as arguments 2 and onwards.
750
751 Using this (instead of $self->_dbh or $self->dbh) ensures correct
752 exception handling and reconnection (or failover in future subclasses).
753
754 Your subref should have no side-effects outside of the database, as
755 there is the potential for your subref to be partially double-executed
756 if the database connection was stale/dysfunctional.
757
758 Example:
759
760   my @stuff = $schema->storage->dbh_do(
761     sub {
762       my ($storage, $dbh, @cols) = @_;
763       my $cols = join(q{, }, @cols);
764       $dbh->selectrow_array("SELECT $cols FROM foo");
765     },
766     @column_list
767   );
768
769 =cut
770
771 sub dbh_do {
772   my $self = shift;
773   my $code = shift;
774
775   my $dbh = $self->_get_dbh;
776
777   return $self->$code($dbh, @_)
778     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
779
780   local $self->{_in_dbh_do} = 1;
781
782   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
783   my $args = \@_;
784   return try {
785     $self->$code ($dbh, @$args);
786   } catch {
787     $self->throw_exception($_) if $self->connected;
788
789     # We were not connected - reconnect and retry, but let any
790     #  exception fall right through this time
791     carp "Retrying $code after catching disconnected exception: $_"
792       if $ENV{DBIC_DBIRETRY_DEBUG};
793
794     $self->_populate_dbh;
795     $self->$code($self->_dbh, @$args);
796   };
797 }
798
799 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
800 # It also informs dbh_do to bypass itself while under the direction of txn_do,
801 # via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
802 sub txn_do {
803   my $self = shift;
804   my $coderef = shift;
805
806   ref $coderef eq 'CODE' or $self->throw_exception
807     ('$coderef must be a CODE reference');
808
809   local $self->{_in_dbh_do} = 1;
810
811   my @result;
812   my $want = wantarray;
813
814   my $tried = 0;
815   while(1) {
816     my $exception;
817
818     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
819     my $args = \@_;
820
821     try {
822       $self->txn_begin;
823       my $txn_start_depth = $self->transaction_depth;
824       if($want) {
825           @result = $coderef->(@$args);
826       }
827       elsif(defined $want) {
828           $result[0] = $coderef->(@$args);
829       }
830       else {
831           $coderef->(@$args);
832       }
833
834       my $delta_txn = $txn_start_depth - $self->transaction_depth;
835       if ($delta_txn == 0) {
836         $self->txn_commit;
837       }
838       elsif ($delta_txn != 1) {
839         # an off-by-one would mean we fired a rollback
840         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
841       }
842     } catch {
843       $exception = $_;
844     };
845
846     if(! defined $exception) { return wantarray ? @result : $result[0] }
847
848     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
849       my $rollback_exception;
850       try { $self->txn_rollback } catch { $rollback_exception = shift };
851       if(defined $rollback_exception) {
852         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
853         $self->throw_exception($exception)  # propagate nested rollback
854           if $rollback_exception =~ /$exception_class/;
855
856         $self->throw_exception(
857           "Transaction aborted: ${exception}. "
858           . "Rollback failed: ${rollback_exception}"
859         );
860       }
861       $self->throw_exception($exception)
862     }
863
864     # We were not connected, and was first try - reconnect and retry
865     # via the while loop
866     carp "Retrying $coderef after catching disconnected exception: $exception"
867       if $ENV{DBIC_TXNRETRY_DEBUG};
868     $self->_populate_dbh;
869   }
870 }
871
872 =head2 disconnect
873
874 Our C<disconnect> method also performs a rollback first if the
875 database is not in C<AutoCommit> mode.
876
877 =cut
878
879 sub disconnect {
880   my ($self) = @_;
881
882   if( $self->_dbh ) {
883     my @actions;
884
885     push @actions, ( $self->on_disconnect_call || () );
886     push @actions, $self->_parse_connect_do ('on_disconnect_do');
887
888     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
889
890     $self->_dbh_rollback unless $self->_dbh_autocommit;
891
892     %{ $self->_dbh->{CachedKids} } = ();
893     $self->_dbh->disconnect;
894     $self->_dbh(undef);
895     $self->{_dbh_gen}++;
896   }
897 }
898
899 =head2 with_deferred_fk_checks
900
901 =over 4
902
903 =item Arguments: C<$coderef>
904
905 =item Return Value: The return value of $coderef
906
907 =back
908
909 Storage specific method to run the code ref with FK checks deferred or
910 in MySQL's case disabled entirely.
911
912 =cut
913
914 # Storage subclasses should override this
915 sub with_deferred_fk_checks {
916   my ($self, $sub) = @_;
917   $sub->();
918 }
919
920 =head2 connected
921
922 =over
923
924 =item Arguments: none
925
926 =item Return Value: 1|0
927
928 =back
929
930 Verifies that the current database handle is active and ready to execute
931 an SQL statement (e.g. the connection did not get stale, server is still
932 answering, etc.) This method is used internally by L</dbh>.
933
934 =cut
935
936 sub connected {
937   my $self = shift;
938   return 0 unless $self->_seems_connected;
939
940   #be on the safe side
941   local $self->_dbh->{RaiseError} = 1;
942
943   return $self->_ping;
944 }
945
946 sub _seems_connected {
947   my $self = shift;
948
949   $self->_verify_pid;
950
951   my $dbh = $self->_dbh
952     or return 0;
953
954   return $dbh->FETCH('Active');
955 }
956
957 sub _ping {
958   my $self = shift;
959
960   my $dbh = $self->_dbh or return 0;
961
962   return $dbh->ping;
963 }
964
965 sub ensure_connected {
966   my ($self) = @_;
967
968   unless ($self->connected) {
969     $self->_populate_dbh;
970   }
971 }
972
973 =head2 dbh
974
975 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
976 is guaranteed to be healthy by implicitly calling L</connected>, and if
977 necessary performing a reconnection before returning. Keep in mind that this
978 is very B<expensive> on some database engines. Consider using L</dbh_do>
979 instead.
980
981 =cut
982
983 sub dbh {
984   my ($self) = @_;
985
986   if (not $self->_dbh) {
987     $self->_populate_dbh;
988   } else {
989     $self->ensure_connected;
990   }
991   return $self->_dbh;
992 }
993
994 # this is the internal "get dbh or connect (don't check)" method
995 sub _get_dbh {
996   my $self = shift;
997   $self->_verify_pid;
998   $self->_populate_dbh unless $self->_dbh;
999   return $self->_dbh;
1000 }
1001
1002 sub sql_maker {
1003   my ($self) = @_;
1004   unless ($self->_sql_maker) {
1005     my $sql_maker_class = $self->sql_maker_class;
1006
1007     my %opts = %{$self->_sql_maker_opts||{}};
1008     my $dialect =
1009       $opts{limit_dialect}
1010         ||
1011       $self->sql_limit_dialect
1012         ||
1013       do {
1014         my $s_class = (ref $self) || $self;
1015         carp (
1016           "Your storage class ($s_class) does not set sql_limit_dialect and you "
1017         . 'have not supplied an explicit limit_dialect in your connection_info. '
1018         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1019         . 'databases but can be (and often is) painfully slow. '
1020         . "Please file an RT ticket against '$s_class' ."
1021         );
1022
1023         'GenericSubQ';
1024       }
1025     ;
1026
1027     my ($quote_char, $name_sep);
1028
1029     if ($opts{quote_names}) {
1030       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1031         my $s_class = (ref $self) || $self;
1032         carp (
1033           "You requested 'quote_names' but your storage class ($s_class) does "
1034         . 'not explicitly define a default sql_quote_char and you have not '
1035         . 'supplied a quote_char as part of your connection_info. DBIC will '
1036         .q{default to the ANSI SQL standard quote '"', which works most of }
1037         . "the time. Please file an RT ticket against '$s_class'."
1038         );
1039
1040         '"'; # RV
1041       };
1042
1043       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1044     }
1045
1046     $self->_sql_maker($sql_maker_class->new(
1047       bindtype=>'columns',
1048       array_datatypes => 1,
1049       limit_dialect => $dialect,
1050       ($quote_char ? (quote_char => $quote_char) : ()),
1051       name_sep => ($name_sep || '.'),
1052       %opts,
1053     ));
1054   }
1055   return $self->_sql_maker;
1056 }
1057
1058 # nothing to do by default
1059 sub _rebless {}
1060 sub _init {}
1061
1062 sub _populate_dbh {
1063   my ($self) = @_;
1064
1065   my @info = @{$self->_dbi_connect_info || []};
1066   $self->_dbh(undef); # in case ->connected failed we might get sent here
1067   $self->_dbh_details({}); # reset everything we know
1068
1069   $self->_dbh($self->_connect(@info));
1070
1071   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1072
1073   $self->_determine_driver;
1074
1075   # Always set the transaction depth on connect, since
1076   #  there is no transaction in progress by definition
1077   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1078
1079   $self->_run_connection_actions unless $self->{_in_determine_driver};
1080 }
1081
1082 sub _run_connection_actions {
1083   my $self = shift;
1084   my @actions;
1085
1086   push @actions, ( $self->on_connect_call || () );
1087   push @actions, $self->_parse_connect_do ('on_connect_do');
1088
1089   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1090 }
1091
1092
1093
1094 sub set_use_dbms_capability {
1095   $_[0]->set_inherited ($_[1], $_[2]);
1096 }
1097
1098 sub get_use_dbms_capability {
1099   my ($self, $capname) = @_;
1100
1101   my $use = $self->get_inherited ($capname);
1102   return defined $use
1103     ? $use
1104     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1105   ;
1106 }
1107
1108 sub set_dbms_capability {
1109   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1110 }
1111
1112 sub get_dbms_capability {
1113   my ($self, $capname) = @_;
1114
1115   my $cap = $self->_dbh_details->{capability}{$capname};
1116
1117   unless (defined $cap) {
1118     if (my $meth = $self->can ("_determine$capname")) {
1119       $cap = $self->$meth ? 1 : 0;
1120     }
1121     else {
1122       $cap = 0;
1123     }
1124
1125     $self->set_dbms_capability ($capname, $cap);
1126   }
1127
1128   return $cap;
1129 }
1130
1131 sub _server_info {
1132   my $self = shift;
1133
1134   my $info;
1135   unless ($info = $self->_dbh_details->{info}) {
1136
1137     $info = {};
1138
1139     my $server_version = try { $self->_get_server_version };
1140
1141     if (defined $server_version) {
1142       $info->{dbms_version} = $server_version;
1143
1144       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1145       my @verparts = split (/\./, $numeric_version);
1146       if (
1147         @verparts
1148           &&
1149         $verparts[0] <= 999
1150       ) {
1151         # consider only up to 3 version parts, iff not more than 3 digits
1152         my @use_parts;
1153         while (@verparts && @use_parts < 3) {
1154           my $p = shift @verparts;
1155           last if $p > 999;
1156           push @use_parts, $p;
1157         }
1158         push @use_parts, 0 while @use_parts < 3;
1159
1160         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1161       }
1162     }
1163
1164     $self->_dbh_details->{info} = $info;
1165   }
1166
1167   return $info;
1168 }
1169
1170 sub _get_server_version {
1171   shift->_dbh_get_info(18);
1172 }
1173
1174 sub _dbh_get_info {
1175   my ($self, $info) = @_;
1176
1177   return try { $self->_get_dbh->get_info($info) } || undef;
1178 }
1179
1180 sub _determine_driver {
1181   my ($self) = @_;
1182
1183   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1184     my $started_connected = 0;
1185     local $self->{_in_determine_driver} = 1;
1186
1187     if (ref($self) eq __PACKAGE__) {
1188       my $driver;
1189       if ($self->_dbh) { # we are connected
1190         $driver = $self->_dbh->{Driver}{Name};
1191         $started_connected = 1;
1192       } else {
1193         # if connect_info is a CODEREF, we have no choice but to connect
1194         if (ref $self->_dbi_connect_info->[0] &&
1195             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1196           $self->_populate_dbh;
1197           $driver = $self->_dbh->{Driver}{Name};
1198         }
1199         else {
1200           # try to use dsn to not require being connected, the driver may still
1201           # force a connection in _rebless to determine version
1202           # (dsn may not be supplied at all if all we do is make a mock-schema)
1203           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1204           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1205           $driver ||= $ENV{DBI_DRIVER};
1206         }
1207       }
1208
1209       if ($driver) {
1210         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1211         if ($self->load_optional_class($storage_class)) {
1212           mro::set_mro($storage_class, 'c3');
1213           bless $self, $storage_class;
1214           $self->_rebless();
1215         }
1216       }
1217     }
1218
1219     $self->_driver_determined(1);
1220
1221     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1222
1223     $self->_init; # run driver-specific initializations
1224
1225     $self->_run_connection_actions
1226         if !$started_connected && defined $self->_dbh;
1227   }
1228 }
1229
1230 sub _do_connection_actions {
1231   my $self          = shift;
1232   my $method_prefix = shift;
1233   my $call          = shift;
1234
1235   if (not ref($call)) {
1236     my $method = $method_prefix . $call;
1237     $self->$method(@_);
1238   } elsif (ref($call) eq 'CODE') {
1239     $self->$call(@_);
1240   } elsif (ref($call) eq 'ARRAY') {
1241     if (ref($call->[0]) ne 'ARRAY') {
1242       $self->_do_connection_actions($method_prefix, $_) for @$call;
1243     } else {
1244       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1245     }
1246   } else {
1247     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1248   }
1249
1250   return $self;
1251 }
1252
1253 sub connect_call_do_sql {
1254   my $self = shift;
1255   $self->_do_query(@_);
1256 }
1257
1258 sub disconnect_call_do_sql {
1259   my $self = shift;
1260   $self->_do_query(@_);
1261 }
1262
1263 # override in db-specific backend when necessary
1264 sub connect_call_datetime_setup { 1 }
1265
1266 sub _do_query {
1267   my ($self, $action) = @_;
1268
1269   if (ref $action eq 'CODE') {
1270     $action = $action->($self);
1271     $self->_do_query($_) foreach @$action;
1272   }
1273   else {
1274     # Most debuggers expect ($sql, @bind), so we need to exclude
1275     # the attribute hash which is the second argument to $dbh->do
1276     # furthermore the bind values are usually to be presented
1277     # as named arrayref pairs, so wrap those here too
1278     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1279     my $sql = shift @do_args;
1280     my $attrs = shift @do_args;
1281     my @bind = map { [ undef, $_ ] } @do_args;
1282
1283     $self->_query_start($sql, \@bind);
1284     $self->_get_dbh->do($sql, $attrs, @do_args);
1285     $self->_query_end($sql, \@bind);
1286   }
1287
1288   return $self;
1289 }
1290
1291 sub _connect {
1292   my ($self, @info) = @_;
1293
1294   $self->throw_exception("You failed to provide any connection info")
1295     if !@info;
1296
1297   my ($old_connect_via, $dbh);
1298
1299   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1300     $old_connect_via = $DBI::connect_via;
1301     $DBI::connect_via = 'connect';
1302   }
1303
1304   try {
1305     if(ref $info[0] eq 'CODE') {
1306       $dbh = $info[0]->();
1307     }
1308     else {
1309       require DBI;
1310       $dbh = DBI->connect(@info);
1311     }
1312
1313     if (!$dbh) {
1314       die $DBI::errstr;
1315     }
1316
1317     unless ($self->unsafe) {
1318
1319       $self->throw_exception(
1320         'Refusing clobbering of {HandleError} installed on externally supplied '
1321        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1322       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1323
1324       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1325       # request, or an external handle. Complain and set anyway
1326       unless ($dbh->{RaiseError}) {
1327         carp( ref $info[0] eq 'CODE'
1328
1329           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1330            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1331            .'attribute has been supplied'
1332
1333           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1334            .'unsafe => 1. Toggling RaiseError back to true'
1335         );
1336
1337         $dbh->{RaiseError} = 1;
1338       }
1339
1340       # this odd anonymous coderef dereference is in fact really
1341       # necessary to avoid the unwanted effect described in perl5
1342       # RT#75792
1343       sub {
1344         my $weak_self = $_[0];
1345         weaken $weak_self;
1346
1347         # the coderef is blessed so we can distinguish it from externally
1348         # supplied handles (which must be preserved)
1349         $_[1]->{HandleError} = bless sub {
1350           if ($weak_self) {
1351             $weak_self->throw_exception("DBI Exception: $_[0]");
1352           }
1353           else {
1354             # the handler may be invoked by something totally out of
1355             # the scope of DBIC
1356             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1357           }
1358         }, '__DBIC__DBH__ERROR__HANDLER__';
1359       }->($self, $dbh);
1360     }
1361   }
1362   catch {
1363     $self->throw_exception("DBI Connection failed: $_")
1364   }
1365   finally {
1366     $DBI::connect_via = $old_connect_via if $old_connect_via;
1367   };
1368
1369   $self->_dbh_autocommit($dbh->{AutoCommit});
1370   $dbh;
1371 }
1372
1373 sub svp_begin {
1374   my ($self, $name) = @_;
1375
1376   $name = $self->_svp_generate_name
1377     unless defined $name;
1378
1379   $self->throw_exception ("You can't use savepoints outside a transaction")
1380     if $self->{transaction_depth} == 0;
1381
1382   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1383     unless $self->can('_svp_begin');
1384
1385   push @{ $self->{savepoints} }, $name;
1386
1387   $self->debugobj->svp_begin($name) if $self->debug;
1388
1389   return $self->_svp_begin($name);
1390 }
1391
1392 sub svp_release {
1393   my ($self, $name) = @_;
1394
1395   $self->throw_exception ("You can't use savepoints outside a transaction")
1396     if $self->{transaction_depth} == 0;
1397
1398   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1399     unless $self->can('_svp_release');
1400
1401   if (defined $name) {
1402     $self->throw_exception ("Savepoint '$name' does not exist")
1403       unless grep { $_ eq $name } @{ $self->{savepoints} };
1404
1405     # Dig through the stack until we find the one we are releasing.  This keeps
1406     # the stack up to date.
1407     my $svp;
1408
1409     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1410   } else {
1411     $name = pop @{ $self->{savepoints} };
1412   }
1413
1414   $self->debugobj->svp_release($name) if $self->debug;
1415
1416   return $self->_svp_release($name);
1417 }
1418
1419 sub svp_rollback {
1420   my ($self, $name) = @_;
1421
1422   $self->throw_exception ("You can't use savepoints outside a transaction")
1423     if $self->{transaction_depth} == 0;
1424
1425   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1426     unless $self->can('_svp_rollback');
1427
1428   if (defined $name) {
1429       # If they passed us a name, verify that it exists in the stack
1430       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1431           $self->throw_exception("Savepoint '$name' does not exist!");
1432       }
1433
1434       # Dig through the stack until we find the one we are releasing.  This keeps
1435       # the stack up to date.
1436       while(my $s = pop(@{ $self->{savepoints} })) {
1437           last if($s eq $name);
1438       }
1439       # Add the savepoint back to the stack, as a rollback doesn't remove the
1440       # named savepoint, only everything after it.
1441       push(@{ $self->{savepoints} }, $name);
1442   } else {
1443       # We'll assume they want to rollback to the last savepoint
1444       $name = $self->{savepoints}->[-1];
1445   }
1446
1447   $self->debugobj->svp_rollback($name) if $self->debug;
1448
1449   return $self->_svp_rollback($name);
1450 }
1451
1452 sub _svp_generate_name {
1453   my ($self) = @_;
1454   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1455 }
1456
1457 sub txn_begin {
1458   my $self = shift;
1459
1460   # this means we have not yet connected and do not know the AC status
1461   # (e.g. coderef $dbh)
1462   if (! defined $self->_dbh_autocommit) {
1463     $self->ensure_connected;
1464   }
1465   # otherwise re-connect on pid changes, so
1466   # that the txn_depth is adjusted properly
1467   # the lightweight _get_dbh is good enoug here
1468   # (only superficial handle check, no pings)
1469   else {
1470     $self->_get_dbh;
1471   }
1472
1473   if($self->transaction_depth == 0) {
1474     $self->debugobj->txn_begin()
1475       if $self->debug;
1476     $self->_dbh_begin_work;
1477   }
1478   elsif ($self->auto_savepoint) {
1479     $self->svp_begin;
1480   }
1481   $self->{transaction_depth}++;
1482 }
1483
1484 sub _dbh_begin_work {
1485   my $self = shift;
1486
1487   # if the user is utilizing txn_do - good for him, otherwise we need to
1488   # ensure that the $dbh is healthy on BEGIN.
1489   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1490   # will be replaced by a failure of begin_work itself (which will be
1491   # then retried on reconnect)
1492   if ($self->{_in_dbh_do}) {
1493     $self->_dbh->begin_work;
1494   } else {
1495     $self->dbh_do(sub { $_[1]->begin_work });
1496   }
1497 }
1498
1499 sub txn_commit {
1500   my $self = shift;
1501   if (! $self->_dbh) {
1502     $self->throw_exception('cannot COMMIT on a disconnected handle');
1503   }
1504   elsif ($self->{transaction_depth} == 1) {
1505     $self->debugobj->txn_commit()
1506       if ($self->debug);
1507     $self->_dbh_commit;
1508     $self->{transaction_depth} = 0
1509       if $self->_dbh_autocommit;
1510   }
1511   elsif($self->{transaction_depth} > 1) {
1512     $self->{transaction_depth}--;
1513     $self->svp_release
1514       if $self->auto_savepoint;
1515   }
1516   elsif (! $self->_dbh->FETCH('AutoCommit') ) {
1517
1518     carp "Storage transaction_depth $self->{transaction_depth} does not match "
1519         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1520
1521     $self->debugobj->txn_commit()
1522       if ($self->debug);
1523     $self->_dbh_commit;
1524     $self->{transaction_depth} = 0
1525       if $self->_dbh_autocommit;
1526   }
1527   else {
1528     $self->throw_exception( 'Refusing to commit without a started transaction' );
1529   }
1530 }
1531
1532 sub _dbh_commit {
1533   my $self = shift;
1534   my $dbh  = $self->_dbh
1535     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1536   $dbh->commit;
1537 }
1538
1539 sub txn_rollback {
1540   my $self = shift;
1541   my $dbh = $self->_dbh;
1542   try {
1543     if ($self->{transaction_depth} == 1) {
1544       $self->debugobj->txn_rollback()
1545         if ($self->debug);
1546       $self->{transaction_depth} = 0
1547         if $self->_dbh_autocommit;
1548       $self->_dbh_rollback;
1549     }
1550     elsif($self->{transaction_depth} > 1) {
1551       $self->{transaction_depth}--;
1552       if ($self->auto_savepoint) {
1553         $self->svp_rollback;
1554         $self->svp_release;
1555       }
1556     }
1557     else {
1558       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1559     }
1560   }
1561   catch {
1562     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1563
1564     if ($_ !~ /$exception_class/) {
1565       # ensure that a failed rollback resets the transaction depth
1566       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1567     }
1568
1569     $self->throw_exception($_)
1570   };
1571 }
1572
1573 sub _dbh_rollback {
1574   my $self = shift;
1575   my $dbh  = $self->_dbh
1576     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1577   $dbh->rollback;
1578 }
1579
1580 # This used to be the top-half of _execute.  It was split out to make it
1581 #  easier to override in NoBindVars without duping the rest.  It takes up
1582 #  all of _execute's args, and emits $sql, @bind.
1583 sub _prep_for_execute {
1584   my ($self, $op, $ident, $args) = @_;
1585
1586   my ($sql, @bind) = $self->sql_maker->$op(
1587     blessed($ident) ? $ident->from : $ident,
1588     @$args,
1589   );
1590
1591   my (@final_bind, $colinfos);
1592   my $resolve_bindinfo = sub {
1593     $colinfos ||= $self->_resolve_column_info($ident);
1594     if (my $col = $_[1]->{dbic_colname}) {
1595       $_[1]->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1596         if $colinfos->{$col}{data_type};
1597       $_[1]->{sqlt_size} ||= $colinfos->{$col}{size}
1598         if $colinfos->{$col}{size};
1599     }
1600     $_[1];
1601   };
1602
1603   for my $e (@{$args->[2]{bind}||[]}, @bind) {
1604     push @final_bind, [ do {
1605       if (ref $e ne 'ARRAY') {
1606         ({}, $e)
1607       }
1608       elsif (! defined $e->[0]) {
1609         ({}, $e->[1])
1610       }
1611       elsif (ref $e->[0] eq 'HASH') {
1612         (
1613           (first { $e->[0]{$_} } qw/dbd_attrs sqlt_datatype/) ? $e->[0] : $self->$resolve_bindinfo($e->[0]),
1614           $e->[1]
1615         )
1616       }
1617       elsif (ref $e->[0] eq 'SCALAR') {
1618         ( { sqlt_datatype => ${$e->[0]} }, $e->[1] )
1619       }
1620       else {
1621         ( $self->$resolve_bindinfo({ dbic_colname => $e->[0] }), $e->[1] )
1622       }
1623     }];
1624   }
1625
1626   ($sql, \@final_bind);
1627 }
1628
1629 sub _format_for_trace {
1630   #my ($self, $bind) = @_;
1631
1632   ### Turn @bind from something like this:
1633   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1634   ### to this:
1635   ###   ( "'1'", "'3'" )
1636
1637   map {
1638     defined( $_ && $_->[1] )
1639       ? qq{'$_->[1]'}
1640       : q{NULL}
1641   } @{$_[1] || []};
1642 }
1643
1644 sub _query_start {
1645   my ( $self, $sql, $bind ) = @_;
1646
1647   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1648     if $self->debug;
1649 }
1650
1651 sub _query_end {
1652   my ( $self, $sql, $bind ) = @_;
1653
1654   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1655     if $self->debug;
1656 }
1657
1658 my $sba_compat;
1659 sub _dbi_attrs_for_bind {
1660   my ($self, $ident, $bind) = @_;
1661
1662   if (! defined $sba_compat) {
1663     $self->_determine_driver;
1664     $sba_compat = $self->can('source_bind_attributes') == \&source_bind_attributes
1665       ? 0
1666       : 1
1667     ;
1668   }
1669
1670   my $sba_attrs;
1671   if ($sba_compat) {
1672     my $class = ref $self;
1673     carp_unique (
1674       "The source_bind_attributes() override in $class relies on a deprecated codepath. "
1675      .'You are strongly advised to switch your code to override bind_attribute_by_datatype() '
1676      .'instead. This legacy compat shim will also disappear some time before DBIC 0.09'
1677     );
1678
1679     my $sba_attrs = $self->source_bind_attributes
1680   }
1681
1682   my @attrs;
1683
1684   for (map { $_->[0] } @$bind) {
1685     push @attrs, do {
1686       if ($_->{dbd_attrs}) {
1687         $_->{dbd_attrs}
1688       }
1689       elsif($_->{sqlt_datatype}) {
1690         $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1691       }
1692       elsif ($sba_attrs and $_->{dbic_colname}) {
1693         $sba_attrs->{$_->{dbic_colname}} || undef;
1694       }
1695       else {
1696         undef;  # always push something at this position
1697       }
1698     }
1699   }
1700
1701   return \@attrs;
1702 }
1703
1704 sub _execute {
1705   my ($self, $op, $ident, @args) = @_;
1706
1707   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1708
1709   shift->dbh_do(    # retry over disconnects
1710     '_dbh_execute',
1711     $sql,
1712     $bind,
1713     $self->_dbi_attrs_for_bind($ident, $bind)
1714   );
1715 }
1716
1717 sub _dbh_execute {
1718   my ($self, undef, $sql, $bind, $bind_attrs) = @_;
1719
1720   $self->_query_start( $sql, $bind );
1721   my $sth = $self->_sth($sql);
1722
1723   for my $i (0 .. $#$bind) {
1724     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1725       $sth->bind_param_inout(
1726         $i + 1, # bind params counts are 1-based
1727         $bind->[$i][1],
1728         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1729         $bind_attrs->[$i],
1730       );
1731     }
1732     else {
1733       $sth->bind_param(
1734         $i + 1,
1735         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1736           ? "$bind->[$i][1]"
1737           : $bind->[$i][1]
1738         ,
1739         $bind_attrs->[$i],
1740       );
1741     }
1742   }
1743
1744   # Can this fail without throwing an exception anyways???
1745   my $rv = $sth->execute();
1746   $self->throw_exception(
1747     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1748   ) if !$rv;
1749
1750   $self->_query_end( $sql, $bind );
1751
1752   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1753 }
1754
1755 sub _prefetch_autovalues {
1756   my ($self, $source, $to_insert) = @_;
1757
1758   my $colinfo = $source->columns_info;
1759
1760   my %values;
1761   for my $col (keys %$colinfo) {
1762     if (
1763       $colinfo->{$col}{auto_nextval}
1764         and
1765       (
1766         ! exists $to_insert->{$col}
1767           or
1768         ref $to_insert->{$col} eq 'SCALAR'
1769           or
1770         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1771       )
1772     ) {
1773       $values{$col} = $self->_sequence_fetch(
1774         'NEXTVAL',
1775         ( $colinfo->{$col}{sequence} ||=
1776             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1777         ),
1778       );
1779     }
1780   }
1781
1782   \%values;
1783 }
1784
1785 sub insert {
1786   my ($self, $source, $to_insert) = @_;
1787
1788   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1789
1790   # fuse the values, but keep a separate list of prefetched_values so that
1791   # they can be fused once again with the final return
1792   $to_insert = { %$to_insert, %$prefetched_values };
1793
1794   my $col_infos = $source->columns_info;
1795   my %pcols = map { $_ => 1 } $source->primary_columns;
1796   my %retrieve_cols;
1797   for my $col ($source->columns) {
1798     # nothing to retrieve when explicit values are supplied
1799     next if (defined $to_insert->{$col} and ! (
1800       ref $to_insert->{$col} eq 'SCALAR'
1801         or
1802       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1803     ));
1804
1805     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1806     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1807       $pcols{$col}
1808         or
1809       $col_infos->{$col}{retrieve_on_insert}
1810     );
1811   };
1812
1813   my ($sqla_opts, @ir_container);
1814   if (%retrieve_cols and $self->_use_insert_returning) {
1815     $sqla_opts->{returning_container} = \@ir_container
1816       if $self->_use_insert_returning_bound;
1817
1818     $sqla_opts->{returning} = [
1819       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1820     ];
1821   }
1822
1823   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1824
1825   my %returned_cols = %$to_insert;
1826   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1827     @ir_container = try {
1828       local $SIG{__WARN__} = sub {};
1829       my @r = $sth->fetchrow_array;
1830       $sth->finish;
1831       @r;
1832     } unless @ir_container;
1833
1834     @returned_cols{@$retlist} = @ir_container if @ir_container;
1835   }
1836   else {
1837     # pull in PK if needed and then everything else
1838     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1839
1840       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1841         unless $self->can('last_insert_id');
1842
1843       my @pri_values = $self->last_insert_id($source, @missing_pri);
1844
1845       $self->throw_exception( "Can't get last insert id" )
1846         unless (@pri_values == @missing_pri);
1847
1848       @returned_cols{@missing_pri} = @pri_values;
1849       delete $retrieve_cols{$_} for @missing_pri;
1850     }
1851
1852     # if there is more left to pull
1853     if (%retrieve_cols) {
1854       $self->throw_exception(
1855         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1856       ) unless %pcols;
1857
1858       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1859
1860       my $cur = DBIx::Class::ResultSet->new($source, {
1861         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1862         select => \@left_to_fetch,
1863       })->cursor;
1864
1865       @returned_cols{@left_to_fetch} = $cur->next;
1866
1867       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1868         if scalar $cur->next;
1869     }
1870   }
1871
1872   return { %$prefetched_values, %returned_cols };
1873 }
1874
1875 sub insert_bulk {
1876   my ($self, $source, $cols, $data) = @_;
1877
1878   # FIXME - perhaps this is not even needed? does DBI stringify?
1879   #
1880   # forcibly stringify whatever is stringifiable
1881   for my $r (0 .. $#$data) {
1882     for my $c (0 .. $#{$data->[$r]}) {
1883       $data->[$r][$c] = "$data->[$r][$c]"
1884         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1885     }
1886   }
1887
1888   # check the data for consistency
1889   # report a sensible error on bad data
1890   #
1891   # also create a list of dynamic binds (ones that will be changing
1892   # for each row)
1893   my $dyn_bind_idx;
1894   for my $col_idx (0..$#$cols) {
1895
1896     # the first "row" is used as a point of reference
1897     my $reference_val = $data->[0][$col_idx];
1898     my $is_literal = ref $reference_val eq 'SCALAR';
1899     my $is_literal_bind = ( !$is_literal and (
1900       ref $reference_val eq 'REF'
1901         and
1902       ref $$reference_val eq 'ARRAY'
1903     ) );
1904
1905     $dyn_bind_idx->{$col_idx} = 1
1906       if (!$is_literal and !$is_literal_bind);
1907
1908     # use a closure for convenience (less to pass)
1909     my $bad_slice = sub {
1910       my ($msg, $slice_idx) = @_;
1911       $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1912         $msg,
1913         $cols->[$col_idx],
1914         do {
1915           require Data::Dumper::Concise;
1916           local $Data::Dumper::Maxdepth = 2;
1917           Data::Dumper::Concise::Dumper ({
1918             map { $cols->[$_] =>
1919               $data->[$slice_idx][$_]
1920             } (0 .. $#$cols)
1921           }),
1922         }
1923       );
1924     };
1925
1926     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
1927       my $val = $data->[$row_idx][$col_idx];
1928
1929       if ($is_literal) {
1930         if (ref $val ne 'SCALAR') {
1931           $bad_slice->(
1932             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
1933             $row_idx
1934           );
1935         }
1936         elsif ($$val ne $$reference_val) {
1937           $bad_slice->(
1938             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
1939             $row_idx
1940           );
1941         }
1942       }
1943       elsif ($is_literal_bind) {
1944         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
1945           $bad_slice->(
1946             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
1947             $row_idx
1948           );
1949         }
1950         elsif (${$val}->[0] ne ${$reference_val}->[0]) {
1951           $bad_slice->(
1952             "Inconsistent literal SQL-bind value (expecting \\['${$reference_val}->[0]', ... ])",
1953             $row_idx
1954           );
1955         }
1956       }
1957       elsif (ref $val) {
1958         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
1959           $bad_slice->("Literal SQL found where a plain bind value is expected", $row_idx);
1960         }
1961         else {
1962           $bad_slice->("$val reference found where bind expected", $row_idx);
1963         }
1964       }
1965     }
1966   }
1967
1968   # Get the sql with bind values interpolated where necessary. For dynamic
1969   # binds convert the values of the first row into a literal+bind combo, with
1970   # extra positional info in the bind attr hashref. This will allow us to match
1971   # the order properly, and is so contrived because a user-supplied literal
1972   # bind (or something else specific to a resultsource and/or storage driver)
1973   # can inject extra binds along the way, so one can't rely on "shift
1974   # positions" ordering at all. Also we can't just hand SQLA a set of some
1975   # known "values" (e.g. hashrefs that can be later matched up by address),
1976   # because we want to supply a real value on which perhaps e.g. datatype
1977   # checks will be performed
1978   my ($sql, $proto_bind) = $self->_prep_for_execute (
1979     'insert',
1980     $source,
1981     [ { map { $cols->[$_] => $dyn_bind_idx->{$_}
1982       ? \[ '?', [
1983           { dbic_colname => $cols->[$_], _bind_data_slice_idx => $_ }
1984             =>
1985           $data->[0][$_]
1986         ] ]
1987       : $data->[0][$_]
1988     } (0..$#$cols) } ],
1989   );
1990
1991   if (! @$proto_bind and keys %$dyn_bind_idx) {
1992     # if the bindlist is empty and we had some dynamic binds, this means the
1993     # storage ate them away (e.g. the NoBindVars component) and interpolated
1994     # them directly into the SQL. This obviosly can't be good for multi-inserts
1995     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1996   }
1997
1998   # neither _execute_array, nor _execute_inserts_with_no_binds are
1999   # atomic (even if _execute _array is a single call). Thus a safety
2000   # scope guard
2001   my $guard = $self->txn_scope_guard;
2002
2003   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
2004   my $sth = $self->_sth($sql);
2005   my $rv = do {
2006     if (@$proto_bind) {
2007       # proto bind contains the information on which pieces of $data to pull
2008       # $cols is passed in only for prettier error-reporting
2009       $self->_execute_array( $source, $sth, $proto_bind, $cols, $data );
2010     }
2011     else {
2012       # bind_param_array doesn't work if there are no binds
2013       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
2014     }
2015   };
2016
2017   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
2018
2019   $guard->commit;
2020
2021   return (wantarray ? ($rv, $sth, @$proto_bind) : $rv);
2022 }
2023
2024 sub _execute_array {
2025   my ($self, $source, $sth, $proto_bind, $cols, $data, @extra) = @_;
2026
2027   ## This must be an arrayref, else nothing works!
2028   my $tuple_status = [];
2029
2030   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
2031
2032   # Bind the values by column slices
2033   for my $i (0 .. $#$proto_bind) {
2034     my $data_slice_idx = (
2035       ref $proto_bind->[$i][0] eq 'HASH'
2036         and
2037       exists $proto_bind->[$i][0]{_bind_data_slice_idx}
2038     ) ? $proto_bind->[$i][0]{_bind_data_slice_idx} : undef;
2039
2040     $sth->bind_param_array(
2041       $i+1, # DBI bind indexes are 1-based
2042       defined $data_slice_idx
2043         # either get a "column" of dynamic values, or just repeat the same
2044         # bind over and over
2045         ? [ map { $_->[$data_slice_idx] } @$data ]
2046         : [ ($proto_bind->[$i][1]) x @$data ]
2047       ,
2048       defined $bind_attrs->[$i] ? $bind_attrs->[$i] : (), # some DBDs throw up when given an undef
2049     );
2050   }
2051
2052   my ($rv, $err);
2053   try {
2054     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
2055   }
2056   catch {
2057     $err = shift;
2058   };
2059
2060   # Not all DBDs are create equal. Some throw on error, some return
2061   # an undef $rv, and some set $sth->err - try whatever we can
2062   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2063     ! defined $err
2064       and
2065     ( !defined $rv or $sth->err )
2066   );
2067
2068   # Statement must finish even if there was an exception.
2069   try {
2070     $sth->finish
2071   }
2072   catch {
2073     $err = shift unless defined $err
2074   };
2075
2076   if (defined $err) {
2077     my $i = 0;
2078     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2079
2080     $self->throw_exception("Unexpected populate error: $err")
2081       if ($i > $#$tuple_status);
2082
2083     require Data::Dumper::Concise;
2084     $self->throw_exception(sprintf "execute_array() aborted with '%s' at populate slice:\n%s",
2085       ($tuple_status->[$i][1] || $err),
2086       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2087     );
2088   }
2089
2090   return $rv;
2091 }
2092
2093 sub _dbh_execute_array {
2094   #my ($self, $sth, $tuple_status, @extra) = @_;
2095   return $_[1]->execute_array({ArrayTupleStatus => $_[2]});
2096 }
2097
2098 sub _dbh_execute_inserts_with_no_binds {
2099   my ($self, $sth, $count) = @_;
2100
2101   my $err;
2102   try {
2103     my $dbh = $self->_get_dbh;
2104     local $dbh->{RaiseError} = 1;
2105     local $dbh->{PrintError} = 0;
2106
2107     $sth->execute foreach 1..$count;
2108   }
2109   catch {
2110     $err = shift;
2111   };
2112
2113   # Make sure statement is finished even if there was an exception.
2114   try {
2115     $sth->finish
2116   }
2117   catch {
2118     $err = shift unless defined $err;
2119   };
2120
2121   $self->throw_exception($err) if defined $err;
2122
2123   return $count;
2124 }
2125
2126 sub update {
2127   #my ($self, $source, @args) = @_;
2128   shift->_execute('update', @_);
2129 }
2130
2131
2132 sub delete {
2133   #my ($self, $source, @args) = @_;
2134   shift->_execute('delete', @_);
2135 }
2136
2137 # We were sent here because the $rs contains a complex search
2138 # which will require a subquery to select the correct rows
2139 # (i.e. joined or limited resultsets, or non-introspectable conditions)
2140 #
2141 # Generating a single PK column subquery is trivial and supported
2142 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
2143 # Look at _multipk_update_delete()
2144 sub _subq_update_delete {
2145   my $self = shift;
2146   my ($rs, $op, $values) = @_;
2147
2148   my $rsrc = $rs->result_source;
2149
2150   # quick check if we got a sane rs on our hands
2151   my @pcols = $rsrc->_pri_cols;
2152
2153   my $sel = $rs->_resolved_attrs->{select};
2154   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
2155
2156   if (
2157       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
2158         ne
2159       join ("\x00", sort @$sel )
2160   ) {
2161     $self->throw_exception (
2162       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
2163     );
2164   }
2165
2166   if (@pcols == 1) {
2167     return $self->$op (
2168       $rsrc,
2169       $op eq 'update' ? $values : (),
2170       { $pcols[0] => { -in => $rs->as_query } },
2171     );
2172   }
2173
2174   else {
2175     return $self->_multipk_update_delete (@_);
2176   }
2177 }
2178
2179 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2180 # resultset update/delete involving subqueries. So by default resort
2181 # to simple (and inefficient) delete_all style per-row opearations,
2182 # while allowing specific storages to override this with a faster
2183 # implementation.
2184 #
2185 sub _multipk_update_delete {
2186   return shift->_per_row_update_delete (@_);
2187 }
2188
2189 # This is the default loop used to delete/update rows for multi PK
2190 # resultsets, and used by mysql exclusively (because it can't do anything
2191 # else).
2192 #
2193 # We do not use $row->$op style queries, because resultset update/delete
2194 # is not expected to cascade (this is what delete_all/update_all is for).
2195 #
2196 # There should be no race conditions as the entire operation is rolled
2197 # in a transaction.
2198 #
2199 sub _per_row_update_delete {
2200   my $self = shift;
2201   my ($rs, $op, $values) = @_;
2202
2203   my $rsrc = $rs->result_source;
2204   my @pcols = $rsrc->_pri_cols;
2205
2206   my $guard = $self->txn_scope_guard;
2207
2208   # emulate the return value of $sth->execute for non-selects
2209   my $row_cnt = '0E0';
2210
2211   my $subrs_cur = $rs->cursor;
2212   my @all_pk = $subrs_cur->all;
2213   for my $pks ( @all_pk) {
2214
2215     my $cond;
2216     for my $i (0.. $#pcols) {
2217       $cond->{$pcols[$i]} = $pks->[$i];
2218     }
2219
2220     $self->$op (
2221       $rsrc,
2222       $op eq 'update' ? $values : (),
2223       $cond,
2224     );
2225
2226     $row_cnt++;
2227   }
2228
2229   $guard->commit;
2230
2231   return $row_cnt;
2232 }
2233
2234 sub _select {
2235   my $self = shift;
2236   $self->_execute($self->_select_args(@_));
2237 }
2238
2239 sub _select_args_to_query {
2240   my $self = shift;
2241
2242   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2243   #  = $self->_select_args($ident, $select, $cond, $attrs);
2244   my ($op, $ident, @args) =
2245     $self->_select_args(@_);
2246
2247   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2248   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $ident, \@args);
2249   $prepared_bind ||= [];
2250
2251   return wantarray
2252     ? ($sql, $prepared_bind)
2253     : \[ "($sql)", @$prepared_bind ]
2254   ;
2255 }
2256
2257 sub _select_args {
2258   my ($self, $ident, $select, $where, $attrs) = @_;
2259
2260   my $sql_maker = $self->sql_maker;
2261   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2262
2263   $attrs = {
2264     %$attrs,
2265     select => $select,
2266     from => $ident,
2267     where => $where,
2268     $rs_alias && $alias2source->{$rs_alias}
2269       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2270       : ()
2271     ,
2272   };
2273
2274   # Sanity check the attributes (SQLMaker does it too, but
2275   # in case of a software_limit we'll never reach there)
2276   if (defined $attrs->{offset}) {
2277     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2278       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2279   }
2280
2281   if (defined $attrs->{rows}) {
2282     $self->throw_exception("The rows attribute must be a positive integer if present")
2283       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2284   }
2285   elsif ($attrs->{offset}) {
2286     # MySQL actually recommends this approach.  I cringe.
2287     $attrs->{rows} = $sql_maker->__max_int;
2288   }
2289
2290   my @limit;
2291
2292   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2293   # storage, unless software limit was requested
2294   if (
2295     #limited has_many
2296     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2297        ||
2298     # grouped prefetch (to satisfy group_by == select)
2299     ( $attrs->{group_by}
2300         &&
2301       @{$attrs->{group_by}}
2302         &&
2303       $attrs->{_prefetch_selector_range}
2304     )
2305   ) {
2306     ($ident, $select, $where, $attrs)
2307       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2308   }
2309   elsif (! $attrs->{software_limit} ) {
2310     push @limit, (
2311       $attrs->{rows} || (),
2312       $attrs->{offset} || (),
2313     );
2314   }
2315
2316   # try to simplify the joinmap further (prune unreferenced type-single joins)
2317   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2318
2319 ###
2320   # This would be the point to deflate anything found in $where
2321   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2322   # expect a row object. And all we have is a resultsource (it is trivial
2323   # to extract deflator coderefs via $alias2source above).
2324   #
2325   # I don't see a way forward other than changing the way deflators are
2326   # invoked, and that's just bad...
2327 ###
2328
2329   return ('select', $ident, $select, $where, $attrs, @limit);
2330 }
2331
2332 # Returns a counting SELECT for a simple count
2333 # query. Abstracted so that a storage could override
2334 # this to { count => 'firstcol' } or whatever makes
2335 # sense as a performance optimization
2336 sub _count_select {
2337   #my ($self, $source, $rs_attrs) = @_;
2338   return { count => '*' };
2339 }
2340
2341 sub source_bind_attributes {
2342   shift->throw_exception(
2343     'source_bind_attributes() was never meant to be a callable public method - '
2344    .'please contact the DBIC dev-team and describe your use case so that a reasonable '
2345    .'solution can be provided'
2346    ."\nhttp://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT"
2347   );
2348 }
2349
2350 =head2 select
2351
2352 =over 4
2353
2354 =item Arguments: $ident, $select, $condition, $attrs
2355
2356 =back
2357
2358 Handle a SQL select statement.
2359
2360 =cut
2361
2362 sub select {
2363   my $self = shift;
2364   my ($ident, $select, $condition, $attrs) = @_;
2365   return $self->cursor_class->new($self, \@_, $attrs);
2366 }
2367
2368 sub select_single {
2369   my $self = shift;
2370   my ($rv, $sth, @bind) = $self->_select(@_);
2371   my @row = $sth->fetchrow_array;
2372   my @nextrow = $sth->fetchrow_array if @row;
2373   if(@row && @nextrow) {
2374     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2375   }
2376   # Need to call finish() to work round broken DBDs
2377   $sth->finish();
2378   return @row;
2379 }
2380
2381 =head2 sql_limit_dialect
2382
2383 This is an accessor for the default SQL limit dialect used by a particular
2384 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2385 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2386 see L<DBIx::Class::SQLMaker::LimitDialects>.
2387
2388 =head2 sth
2389
2390 =over 4
2391
2392 =item Arguments: $sql
2393
2394 =back
2395
2396 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2397
2398 =cut
2399
2400 sub _dbh_sth {
2401   my ($self, $dbh, $sql) = @_;
2402
2403   # 3 is the if_active parameter which avoids active sth re-use
2404   my $sth = $self->disable_sth_caching
2405     ? $dbh->prepare($sql)
2406     : $dbh->prepare_cached($sql, {}, 3);
2407
2408   # XXX You would think RaiseError would make this impossible,
2409   #  but apparently that's not true :(
2410   $self->throw_exception(
2411     $dbh->errstr
2412       ||
2413     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2414             .'an exception and/or setting $dbh->errstr',
2415       length ($sql) > 20
2416         ? substr($sql, 0, 20) . '...'
2417         : $sql
2418       ,
2419       'DBD::' . $dbh->{Driver}{Name},
2420     )
2421   ) if !$sth;
2422
2423   $sth;
2424 }
2425
2426 sub sth {
2427   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2428   shift->_sth(@_);
2429 }
2430
2431 sub _sth {
2432   my ($self, $sql) = @_;
2433   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2434 }
2435
2436 sub _dbh_columns_info_for {
2437   my ($self, $dbh, $table) = @_;
2438
2439   if ($dbh->can('column_info')) {
2440     my %result;
2441     my $caught;
2442     try {
2443       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2444       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2445       $sth->execute();
2446       while ( my $info = $sth->fetchrow_hashref() ){
2447         my %column_info;
2448         $column_info{data_type}   = $info->{TYPE_NAME};
2449         $column_info{size}      = $info->{COLUMN_SIZE};
2450         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2451         $column_info{default_value} = $info->{COLUMN_DEF};
2452         my $col_name = $info->{COLUMN_NAME};
2453         $col_name =~ s/^\"(.*)\"$/$1/;
2454
2455         $result{$col_name} = \%column_info;
2456       }
2457     } catch {
2458       $caught = 1;
2459     };
2460     return \%result if !$caught && scalar keys %result;
2461   }
2462
2463   my %result;
2464   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2465   $sth->execute;
2466   my @columns = @{$sth->{NAME_lc}};
2467   for my $i ( 0 .. $#columns ){
2468     my %column_info;
2469     $column_info{data_type} = $sth->{TYPE}->[$i];
2470     $column_info{size} = $sth->{PRECISION}->[$i];
2471     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2472
2473     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2474       $column_info{data_type} = $1;
2475       $column_info{size}    = $2;
2476     }
2477
2478     $result{$columns[$i]} = \%column_info;
2479   }
2480   $sth->finish;
2481
2482   foreach my $col (keys %result) {
2483     my $colinfo = $result{$col};
2484     my $type_num = $colinfo->{data_type};
2485     my $type_name;
2486     if(defined $type_num && $dbh->can('type_info')) {
2487       my $type_info = $dbh->type_info($type_num);
2488       $type_name = $type_info->{TYPE_NAME} if $type_info;
2489       $colinfo->{data_type} = $type_name if $type_name;
2490     }
2491   }
2492
2493   return \%result;
2494 }
2495
2496 sub columns_info_for {
2497   my ($self, $table) = @_;
2498   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2499 }
2500
2501 =head2 last_insert_id
2502
2503 Return the row id of the last insert.
2504
2505 =cut
2506
2507 sub _dbh_last_insert_id {
2508     my ($self, $dbh, $source, $col) = @_;
2509
2510     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2511
2512     return $id if defined $id;
2513
2514     my $class = ref $self;
2515     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2516 }
2517
2518 sub last_insert_id {
2519   my $self = shift;
2520   $self->_dbh_last_insert_id ($self->_dbh, @_);
2521 }
2522
2523 =head2 _native_data_type
2524
2525 =over 4
2526
2527 =item Arguments: $type_name
2528
2529 =back
2530
2531 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2532 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2533 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2534
2535 The default implementation returns C<undef>, implement in your Storage driver if
2536 you need this functionality.
2537
2538 Should map types from other databases to the native RDBMS type, for example
2539 C<VARCHAR2> to C<VARCHAR>.
2540
2541 Types with modifiers should map to the underlying data type. For example,
2542 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2543
2544 Composite types should map to the container type, for example
2545 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2546
2547 =cut
2548
2549 sub _native_data_type {
2550   #my ($self, $data_type) = @_;
2551   return undef
2552 }
2553
2554 # Check if placeholders are supported at all
2555 sub _determine_supports_placeholders {
2556   my $self = shift;
2557   my $dbh  = $self->_get_dbh;
2558
2559   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2560   # but it is inaccurate more often than not
2561   return try {
2562     local $dbh->{PrintError} = 0;
2563     local $dbh->{RaiseError} = 1;
2564     $dbh->do('select ?', {}, 1);
2565     1;
2566   }
2567   catch {
2568     0;
2569   };
2570 }
2571
2572 # Check if placeholders bound to non-string types throw exceptions
2573 #
2574 sub _determine_supports_typeless_placeholders {
2575   my $self = shift;
2576   my $dbh  = $self->_get_dbh;
2577
2578   return try {
2579     local $dbh->{PrintError} = 0;
2580     local $dbh->{RaiseError} = 1;
2581     # this specifically tests a bind that is NOT a string
2582     $dbh->do('select 1 where 1 = ?', {}, 1);
2583     1;
2584   }
2585   catch {
2586     0;
2587   };
2588 }
2589
2590 =head2 sqlt_type
2591
2592 Returns the database driver name.
2593
2594 =cut
2595
2596 sub sqlt_type {
2597   shift->_get_dbh->{Driver}->{Name};
2598 }
2599
2600 =head2 bind_attribute_by_data_type
2601
2602 Given a datatype from column info, returns a database specific bind
2603 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2604 let the database planner just handle it.
2605
2606 Generally only needed for special case column types, like bytea in postgres.
2607
2608 =cut
2609
2610 sub bind_attribute_by_data_type {
2611     return;
2612 }
2613
2614 =head2 is_datatype_numeric
2615
2616 Given a datatype from column_info, returns a boolean value indicating if
2617 the current RDBMS considers it a numeric value. This controls how
2618 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2619 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2620 be performed instead of the usual C<eq>.
2621
2622 =cut
2623
2624 sub is_datatype_numeric {
2625   #my ($self, $dt) = @_;
2626
2627   return 0 unless $_[1];
2628
2629   $_[1] =~ /^ (?:
2630     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2631   ) $/ix;
2632 }
2633
2634
2635 =head2 create_ddl_dir
2636
2637 =over 4
2638
2639 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2640
2641 =back
2642
2643 Creates a SQL file based on the Schema, for each of the specified
2644 database engines in C<\@databases> in the given directory.
2645 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2646
2647 Given a previous version number, this will also create a file containing
2648 the ALTER TABLE statements to transform the previous schema into the
2649 current one. Note that these statements may contain C<DROP TABLE> or
2650 C<DROP COLUMN> statements that can potentially destroy data.
2651
2652 The file names are created using the C<ddl_filename> method below, please
2653 override this method in your schema if you would like a different file
2654 name format. For the ALTER file, the same format is used, replacing
2655 $version in the name with "$preversion-$version".
2656
2657 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2658 The most common value for this would be C<< { add_drop_table => 1 } >>
2659 to have the SQL produced include a C<DROP TABLE> statement for each table
2660 created. For quoting purposes supply C<quote_table_names> and
2661 C<quote_field_names>.
2662
2663 If no arguments are passed, then the following default values are assumed:
2664
2665 =over 4
2666
2667 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2668
2669 =item version    - $schema->schema_version
2670
2671 =item directory  - './'
2672
2673 =item preversion - <none>
2674
2675 =back
2676
2677 By default, C<\%sqlt_args> will have
2678
2679  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2680
2681 merged with the hash passed in. To disable any of those features, pass in a
2682 hashref like the following
2683
2684  { ignore_constraint_names => 0, # ... other options }
2685
2686
2687 WARNING: You are strongly advised to check all SQL files created, before applying
2688 them.
2689
2690 =cut
2691
2692 sub create_ddl_dir {
2693   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2694
2695   unless ($dir) {
2696     carp "No directory given, using ./\n";
2697     $dir = './';
2698   } else {
2699       -d $dir
2700         or
2701       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2702         or
2703       $self->throw_exception(
2704         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2705       );
2706   }
2707
2708   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2709
2710   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2711   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2712
2713   my $schema_version = $schema->schema_version || '1.x';
2714   $version ||= $schema_version;
2715
2716   $sqltargs = {
2717     add_drop_table => 1,
2718     ignore_constraint_names => 1,
2719     ignore_index_names => 1,
2720     %{$sqltargs || {}}
2721   };
2722
2723   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2724     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2725   }
2726
2727   my $sqlt = SQL::Translator->new( $sqltargs );
2728
2729   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2730   my $sqlt_schema = $sqlt->translate({ data => $schema })
2731     or $self->throw_exception ($sqlt->error);
2732
2733   foreach my $db (@$databases) {
2734     $sqlt->reset();
2735     $sqlt->{schema} = $sqlt_schema;
2736     $sqlt->producer($db);
2737
2738     my $file;
2739     my $filename = $schema->ddl_filename($db, $version, $dir);
2740     if (-e $filename && ($version eq $schema_version )) {
2741       # if we are dumping the current version, overwrite the DDL
2742       carp "Overwriting existing DDL file - $filename";
2743       unlink($filename);
2744     }
2745
2746     my $output = $sqlt->translate;
2747     if(!$output) {
2748       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2749       next;
2750     }
2751     if(!open($file, ">$filename")) {
2752       $self->throw_exception("Can't open $filename for writing ($!)");
2753       next;
2754     }
2755     print $file $output;
2756     close($file);
2757
2758     next unless ($preversion);
2759
2760     require SQL::Translator::Diff;
2761
2762     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2763     if(!-e $prefilename) {
2764       carp("No previous schema file found ($prefilename)");
2765       next;
2766     }
2767
2768     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2769     if(-e $difffile) {
2770       carp("Overwriting existing diff file - $difffile");
2771       unlink($difffile);
2772     }
2773
2774     my $source_schema;
2775     {
2776       my $t = SQL::Translator->new($sqltargs);
2777       $t->debug( 0 );
2778       $t->trace( 0 );
2779
2780       $t->parser( $db )
2781         or $self->throw_exception ($t->error);
2782
2783       my $out = $t->translate( $prefilename )
2784         or $self->throw_exception ($t->error);
2785
2786       $source_schema = $t->schema;
2787
2788       $source_schema->name( $prefilename )
2789         unless ( $source_schema->name );
2790     }
2791
2792     # The "new" style of producers have sane normalization and can support
2793     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2794     # And we have to diff parsed SQL against parsed SQL.
2795     my $dest_schema = $sqlt_schema;
2796
2797     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2798       my $t = SQL::Translator->new($sqltargs);
2799       $t->debug( 0 );
2800       $t->trace( 0 );
2801
2802       $t->parser( $db )
2803         or $self->throw_exception ($t->error);
2804
2805       my $out = $t->translate( $filename )
2806         or $self->throw_exception ($t->error);
2807
2808       $dest_schema = $t->schema;
2809
2810       $dest_schema->name( $filename )
2811         unless $dest_schema->name;
2812     }
2813
2814     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2815                                                   $dest_schema,   $db,
2816                                                   $sqltargs
2817                                                  );
2818     if(!open $file, ">$difffile") {
2819       $self->throw_exception("Can't write to $difffile ($!)");
2820       next;
2821     }
2822     print $file $diff;
2823     close($file);
2824   }
2825 }
2826
2827 =head2 deployment_statements
2828
2829 =over 4
2830
2831 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2832
2833 =back
2834
2835 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2836
2837 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2838 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2839
2840 C<$directory> is used to return statements from files in a previously created
2841 L</create_ddl_dir> directory and is optional. The filenames are constructed
2842 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2843
2844 If no C<$directory> is specified then the statements are constructed on the
2845 fly using L<SQL::Translator> and C<$version> is ignored.
2846
2847 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2848
2849 =cut
2850
2851 sub deployment_statements {
2852   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2853   $type ||= $self->sqlt_type;
2854   $version ||= $schema->schema_version || '1.x';
2855   $dir ||= './';
2856   my $filename = $schema->ddl_filename($type, $version, $dir);
2857   if(-f $filename)
2858   {
2859       # FIXME replace this block when a proper sane sql parser is available
2860       my $file;
2861       open($file, "<$filename")
2862         or $self->throw_exception("Can't open $filename ($!)");
2863       my @rows = <$file>;
2864       close($file);
2865       return join('', @rows);
2866   }
2867
2868   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2869     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2870   }
2871
2872   # sources needs to be a parser arg, but for simplicty allow at top level
2873   # coming in
2874   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2875       if exists $sqltargs->{sources};
2876
2877   my $tr = SQL::Translator->new(
2878     producer => "SQL::Translator::Producer::${type}",
2879     %$sqltargs,
2880     parser => 'SQL::Translator::Parser::DBIx::Class',
2881     data => $schema,
2882   );
2883
2884   my @ret;
2885   if (wantarray) {
2886     @ret = $tr->translate;
2887   }
2888   else {
2889     $ret[0] = $tr->translate;
2890   }
2891
2892   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2893     unless (@ret && defined $ret[0]);
2894
2895   return wantarray ? @ret : $ret[0];
2896 }
2897
2898 # FIXME deploy() currently does not accurately report sql errors
2899 # Will always return true while errors are warned
2900 sub deploy {
2901   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2902   my $deploy = sub {
2903     my $line = shift;
2904     return if(!$line);
2905     return if($line =~ /^--/);
2906     # next if($line =~ /^DROP/m);
2907     return if($line =~ /^BEGIN TRANSACTION/m);
2908     return if($line =~ /^COMMIT/m);
2909     return if $line =~ /^\s+$/; # skip whitespace only
2910     $self->_query_start($line);
2911     try {
2912       # do a dbh_do cycle here, as we need some error checking in
2913       # place (even though we will ignore errors)
2914       $self->dbh_do (sub { $_[1]->do($line) });
2915     } catch {
2916       carp qq{$_ (running "${line}")};
2917     };
2918     $self->_query_end($line);
2919   };
2920   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2921   if (@statements > 1) {
2922     foreach my $statement (@statements) {
2923       $deploy->( $statement );
2924     }
2925   }
2926   elsif (@statements == 1) {
2927     # split on single line comments and end of statements
2928     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2929       $deploy->( $line );
2930     }
2931   }
2932 }
2933
2934 =head2 datetime_parser
2935
2936 Returns the datetime parser class
2937
2938 =cut
2939
2940 sub datetime_parser {
2941   my $self = shift;
2942   return $self->{datetime_parser} ||= do {
2943     $self->build_datetime_parser(@_);
2944   };
2945 }
2946
2947 =head2 datetime_parser_type
2948
2949 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2950
2951 =head2 build_datetime_parser
2952
2953 See L</datetime_parser>
2954
2955 =cut
2956
2957 sub build_datetime_parser {
2958   my $self = shift;
2959   my $type = $self->datetime_parser_type(@_);
2960   return $type;
2961 }
2962
2963
2964 =head2 is_replicating
2965
2966 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2967 replicate from a master database.  Default is undef, which is the result
2968 returned by databases that don't support replication.
2969
2970 =cut
2971
2972 sub is_replicating {
2973     return;
2974
2975 }
2976
2977 =head2 lag_behind_master
2978
2979 Returns a number that represents a certain amount of lag behind a master db
2980 when a given storage is replicating.  The number is database dependent, but
2981 starts at zero and increases with the amount of lag. Default in undef
2982
2983 =cut
2984
2985 sub lag_behind_master {
2986     return;
2987 }
2988
2989 =head2 relname_to_table_alias
2990
2991 =over 4
2992
2993 =item Arguments: $relname, $join_count
2994
2995 =back
2996
2997 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2998 queries.
2999
3000 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
3001 way these aliases are named.
3002
3003 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
3004 otherwise C<"$relname">.
3005
3006 =cut
3007
3008 sub relname_to_table_alias {
3009   my ($self, $relname, $join_count) = @_;
3010
3011   my $alias = ($join_count && $join_count > 1 ?
3012     join('_', $relname, $join_count) : $relname);
3013
3014   return $alias;
3015 }
3016
3017 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
3018 # version and it may be necessary to amend or override it for a specific storage
3019 # if such binds are necessary.
3020 sub _max_column_bytesize {
3021   my ($self, $attr) = @_;
3022
3023   my $max_size;
3024
3025   if ($attr->{sqlt_datatype}) {
3026     my $data_type = lc($attr->{sqlt_datatype});
3027
3028     if ($attr->{sqlt_size}) {
3029
3030       # String/sized-binary types
3031       if ($data_type =~ /^(?:
3032           l? (?:var)? char(?:acter)? (?:\s*varying)?
3033             |
3034           (?:var)? binary (?:\s*varying)? 
3035             |
3036           raw
3037         )\b/x
3038       ) {
3039         $max_size = $attr->{sqlt_size};
3040       }
3041       # Other charset/unicode types, assume scale of 4
3042       elsif ($data_type =~ /^(?:
3043           national \s* character (?:\s*varying)?
3044             |
3045           nchar
3046             |
3047           univarchar
3048             |
3049           nvarchar
3050         )\b/x
3051       ) {
3052         $max_size = $attr->{sqlt_size} * 4;
3053       }
3054     }
3055
3056     if (!$max_size and !$self->_is_lob_type($data_type)) {
3057       $max_size = 100 # for all other (numeric?) datatypes
3058     }
3059   }
3060
3061   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
3062 }
3063
3064 # Determine if a data_type is some type of BLOB
3065 sub _is_lob_type {
3066   my ($self, $data_type) = @_;
3067   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
3068     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
3069                                   |varchar|character\s*varying|nvarchar
3070                                   |national\s*character\s*varying))?\z/xi);
3071 }
3072
3073 sub _is_binary_lob_type {
3074   my ($self, $data_type) = @_;
3075   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
3076     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
3077 }
3078
3079 sub _is_text_lob_type {
3080   my ($self, $data_type) = @_;
3081   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
3082     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
3083                         |national\s*character\s*varying))\z/xi);
3084 }
3085
3086 1;
3087
3088 =head1 USAGE NOTES
3089
3090 =head2 DBIx::Class and AutoCommit
3091
3092 DBIx::Class can do some wonderful magic with handling exceptions,
3093 disconnections, and transactions when you use C<< AutoCommit => 1 >>
3094 (the default) combined with C<txn_do> for transaction support.
3095
3096 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
3097 in an assumed transaction between commits, and you're telling us you'd
3098 like to manage that manually.  A lot of the magic protections offered by
3099 this module will go away.  We can't protect you from exceptions due to database
3100 disconnects because we don't know anything about how to restart your
3101 transactions.  You're on your own for handling all sorts of exceptional
3102 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
3103 be with raw DBI.
3104
3105
3106 =head1 AUTHORS
3107
3108 Matt S. Trout <mst@shadowcatsystems.co.uk>
3109
3110 Andy Grundman <andy@hybridized.org>
3111
3112 =head1 LICENSE
3113
3114 You may distribute this code under the same terms as Perl itself.
3115
3116 =cut