Extra sanity check of a fresh DBI handle
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use Scalar::Util qw/refaddr weaken reftype blessed/;
12 use List::Util qw/first/;
13 use Sub::Name 'subname';
14 use Context::Preserve 'preserve_context';
15 use Try::Tiny;
16 use overload ();
17 use Data::Compare (); # no imports!!! guard against insane architecture
18 use DBI::Const::GetInfoType (); # no import of retarded global hash
19 use namespace::clean;
20
21 # default cursor class, overridable in connect_info attributes
22 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
23
24 __PACKAGE__->mk_group_accessors('inherited' => qw/
25   sql_limit_dialect sql_quote_char sql_name_sep
26 /);
27
28 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
29
30 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
31 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
32
33 __PACKAGE__->sql_name_sep('.');
34
35 __PACKAGE__->mk_group_accessors('simple' => qw/
36   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
37   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
38   _perform_autoinc_retrieval _autoinc_supplied_for_op
39 /);
40
41 # the values for these accessors are picked out (and deleted) from
42 # the attribute hashref passed to connect_info
43 my @storage_options = qw/
44   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
45   disable_sth_caching unsafe auto_savepoint
46 /;
47 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
48
49
50 # capability definitions, using a 2-tiered accessor system
51 # The rationale is:
52 #
53 # A driver/user may define _use_X, which blindly without any checks says:
54 # "(do not) use this capability", (use_dbms_capability is an "inherited"
55 # type accessor)
56 #
57 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
58 # accessor, which in turn calls _determine_supports_X, and stores the return
59 # in a special slot on the storage object, which is wiped every time a $dbh
60 # reconnection takes place (it is not guaranteed that upon reconnection we
61 # will get the same rdbms version). _determine_supports_X does not need to
62 # exist on a driver, as we ->can for it before calling.
63
64 my @capabilities = (qw/
65   insert_returning
66   insert_returning_bound
67
68   multicolumn_in
69
70   placeholders
71   typeless_placeholders
72
73   join_optimizer
74 /);
75 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
76 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
77
78 # on by default, not strictly a capability (pending rewrite)
79 __PACKAGE__->_use_join_optimizer (1);
80 sub _determine_supports_join_optimizer { 1 };
81
82 # Each of these methods need _determine_driver called before itself
83 # in order to function reliably. This is a purely DRY optimization
84 #
85 # get_(use)_dbms_capability need to be called on the correct Storage
86 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
87 # _determine_supports_X which obv. needs a correct driver as well
88 my @rdbms_specific_methods = qw/
89   sqlt_type
90   sql_maker
91   build_datetime_parser
92   datetime_parser_type
93
94   txn_begin
95   insert
96   insert_bulk
97   update
98   delete
99   select
100   select_single
101   with_deferred_fk_checks
102
103   get_use_dbms_capability
104   get_dbms_capability
105
106   _server_info
107   _get_server_version
108 /;
109
110 for my $meth (@rdbms_specific_methods) {
111
112   my $orig = __PACKAGE__->can ($meth)
113     or die "$meth is not a ::Storage::DBI method!";
114
115   no strict qw/refs/;
116   no warnings qw/redefine/;
117   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
118     if (
119       # only fire when invoked on an instance, a valid class-based invocation
120       # would e.g. be setting a default for an inherited accessor
121       ref $_[0]
122         and
123       ! $_[0]->_driver_determined
124         and
125       ! $_[0]->{_in_determine_driver}
126     ) {
127       $_[0]->_determine_driver;
128
129       # This for some reason crashes and burns on perl 5.8.1
130       # IFF the method ends up throwing an exception
131       #goto $_[0]->can ($meth);
132
133       my $cref = $_[0]->can ($meth);
134       goto $cref;
135     }
136
137     goto $orig;
138   };
139 }
140
141 =head1 NAME
142
143 DBIx::Class::Storage::DBI - DBI storage handler
144
145 =head1 SYNOPSIS
146
147   my $schema = MySchema->connect('dbi:SQLite:my.db');
148
149   $schema->storage->debug(1);
150
151   my @stuff = $schema->storage->dbh_do(
152     sub {
153       my ($storage, $dbh, @args) = @_;
154       $dbh->do("DROP TABLE authors");
155     },
156     @column_list
157   );
158
159   $schema->resultset('Book')->search({
160      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
161   });
162
163 =head1 DESCRIPTION
164
165 This class represents the connection to an RDBMS via L<DBI>.  See
166 L<DBIx::Class::Storage> for general information.  This pod only
167 documents DBI-specific methods and behaviors.
168
169 =head1 METHODS
170
171 =cut
172
173 sub new {
174   my $new = shift->next::method(@_);
175
176   $new->_sql_maker_opts({});
177   $new->_dbh_details({});
178   $new->{_in_do_block} = 0;
179   $new->{_dbh_gen} = 0;
180
181   # read below to see what this does
182   $new->_arm_global_destructor;
183
184   $new;
185 }
186
187 # This is hack to work around perl shooting stuff in random
188 # order on exit(). If we do not walk the remaining storage
189 # objects in an END block, there is a *small but real* chance
190 # of a fork()ed child to kill the parent's shared DBI handle,
191 # *before perl reaches the DESTROY in this package*
192 # Yes, it is ugly and effective.
193 # Additionally this registry is used by the CLONE method to
194 # make sure no handles are shared between threads
195 {
196   my %seek_and_destroy;
197
198   sub _arm_global_destructor {
199     weaken (
200       $seek_and_destroy{ refaddr($_[0]) } = $_[0]
201     );
202   }
203
204   END {
205     local $?; # just in case the DBI destructor changes it somehow
206
207     # destroy just the object if not native to this process
208     $_->_verify_pid for (grep
209       { defined $_ }
210       values %seek_and_destroy
211     );
212   }
213
214   sub CLONE {
215     # As per DBI's recommendation, DBIC disconnects all handles as
216     # soon as possible (DBIC will reconnect only on demand from within
217     # the thread)
218     my @instances = grep { defined $_ } values %seek_and_destroy;
219     for (@instances) {
220       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
221       $_->_dbh(undef);
222
223       $_->transaction_depth(0);
224       $_->savepoints([]);
225     }
226
227     # properly renumber all existing refs
228     %seek_and_destroy = ();
229     $_->_arm_global_destructor for @instances;
230   }
231 }
232
233 sub DESTROY {
234   my $self = shift;
235
236   # some databases spew warnings on implicit disconnect
237   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
238   local $SIG{__WARN__} = sub {};
239   $self->_dbh(undef);
240
241   # this op is necessary, since the very last perl runtime statement
242   # triggers a global destruction shootout, and the $SIG localization
243   # may very well be destroyed before perl actually gets to do the
244   # $dbh undef
245   1;
246 }
247
248 # handle pid changes correctly - do not destroy parent's connection
249 sub _verify_pid {
250   my $self = shift;
251
252   my $pid = $self->_conn_pid;
253   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
254     $dbh->{InactiveDestroy} = 1;
255     $self->{_dbh_gen}++;
256     $self->_dbh(undef);
257     $self->transaction_depth(0);
258     $self->savepoints([]);
259   }
260
261   return;
262 }
263
264 =head2 connect_info
265
266 This method is normally called by L<DBIx::Class::Schema/connection>, which
267 encapsulates its argument list in an arrayref before passing them here.
268
269 The argument list may contain:
270
271 =over
272
273 =item *
274
275 The same 4-element argument set one would normally pass to
276 L<DBI/connect>, optionally followed by
277 L<extra attributes|/DBIx::Class specific connection attributes>
278 recognized by DBIx::Class:
279
280   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
281
282 =item *
283
284 A single code reference which returns a connected
285 L<DBI database handle|DBI/connect> optionally followed by
286 L<extra attributes|/DBIx::Class specific connection attributes> recognized
287 by DBIx::Class:
288
289   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
290
291 =item *
292
293 A single hashref with all the attributes and the dsn/user/password
294 mixed together:
295
296   $connect_info_args = [{
297     dsn => $dsn,
298     user => $user,
299     password => $pass,
300     %dbi_attributes,
301     %extra_attributes,
302   }];
303
304   $connect_info_args = [{
305     dbh_maker => sub { DBI->connect (...) },
306     %dbi_attributes,
307     %extra_attributes,
308   }];
309
310 This is particularly useful for L<Catalyst> based applications, allowing the
311 following config (L<Config::General> style):
312
313   <Model::DB>
314     schema_class   App::DB
315     <connect_info>
316       dsn          dbi:mysql:database=test
317       user         testuser
318       password     TestPass
319       AutoCommit   1
320     </connect_info>
321   </Model::DB>
322
323 The C<dsn>/C<user>/C<password> combination can be substituted by the
324 C<dbh_maker> key whose value is a coderef that returns a connected
325 L<DBI database handle|DBI/connect>
326
327 =back
328
329 Please note that the L<DBI> docs recommend that you always explicitly
330 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
331 recommends that it be set to I<1>, and that you perform transactions
332 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
333 to I<1> if you do not do explicitly set it to zero.  This is the default
334 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
335
336 =head3 DBIx::Class specific connection attributes
337
338 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
339 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
340 the following connection options. These options can be mixed in with your other
341 L<DBI> connection attributes, or placed in a separate hashref
342 (C<\%extra_attributes>) as shown above.
343
344 Every time C<connect_info> is invoked, any previous settings for
345 these options will be cleared before setting the new ones, regardless of
346 whether any options are specified in the new C<connect_info>.
347
348
349 =over
350
351 =item on_connect_do
352
353 Specifies things to do immediately after connecting or re-connecting to
354 the database.  Its value may contain:
355
356 =over
357
358 =item a scalar
359
360 This contains one SQL statement to execute.
361
362 =item an array reference
363
364 This contains SQL statements to execute in order.  Each element contains
365 a string or a code reference that returns a string.
366
367 =item a code reference
368
369 This contains some code to execute.  Unlike code references within an
370 array reference, its return value is ignored.
371
372 =back
373
374 =item on_disconnect_do
375
376 Takes arguments in the same form as L</on_connect_do> and executes them
377 immediately before disconnecting from the database.
378
379 Note, this only runs if you explicitly call L</disconnect> on the
380 storage object.
381
382 =item on_connect_call
383
384 A more generalized form of L</on_connect_do> that calls the specified
385 C<connect_call_METHOD> methods in your storage driver.
386
387   on_connect_do => 'select 1'
388
389 is equivalent to:
390
391   on_connect_call => [ [ do_sql => 'select 1' ] ]
392
393 Its values may contain:
394
395 =over
396
397 =item a scalar
398
399 Will call the C<connect_call_METHOD> method.
400
401 =item a code reference
402
403 Will execute C<< $code->($storage) >>
404
405 =item an array reference
406
407 Each value can be a method name or code reference.
408
409 =item an array of arrays
410
411 For each array, the first item is taken to be the C<connect_call_> method name
412 or code reference, and the rest are parameters to it.
413
414 =back
415
416 Some predefined storage methods you may use:
417
418 =over
419
420 =item do_sql
421
422 Executes a SQL string or a code reference that returns a SQL string. This is
423 what L</on_connect_do> and L</on_disconnect_do> use.
424
425 It can take:
426
427 =over
428
429 =item a scalar
430
431 Will execute the scalar as SQL.
432
433 =item an arrayref
434
435 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
436 attributes hashref and bind values.
437
438 =item a code reference
439
440 Will execute C<< $code->($storage) >> and execute the return array refs as
441 above.
442
443 =back
444
445 =item datetime_setup
446
447 Execute any statements necessary to initialize the database session to return
448 and accept datetime/timestamp values used with
449 L<DBIx::Class::InflateColumn::DateTime>.
450
451 Only necessary for some databases, see your specific storage driver for
452 implementation details.
453
454 =back
455
456 =item on_disconnect_call
457
458 Takes arguments in the same form as L</on_connect_call> and executes them
459 immediately before disconnecting from the database.
460
461 Calls the C<disconnect_call_METHOD> methods as opposed to the
462 C<connect_call_METHOD> methods called by L</on_connect_call>.
463
464 Note, this only runs if you explicitly call L</disconnect> on the
465 storage object.
466
467 =item disable_sth_caching
468
469 If set to a true value, this option will disable the caching of
470 statement handles via L<DBI/prepare_cached>.
471
472 =item limit_dialect
473
474 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
475 default L</sql_limit_dialect> setting of the storage (if any). For a list
476 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
477
478 =item quote_names
479
480 When true automatically sets L</quote_char> and L</name_sep> to the characters
481 appropriate for your particular RDBMS. This option is preferred over specifying
482 L</quote_char> directly.
483
484 =item quote_char
485
486 Specifies what characters to use to quote table and column names.
487
488 C<quote_char> expects either a single character, in which case is it
489 is placed on either side of the table/column name, or an arrayref of length
490 2 in which case the table/column name is placed between the elements.
491
492 For example under MySQL you should use C<< quote_char => '`' >>, and for
493 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
494
495 =item name_sep
496
497 This parameter is only useful in conjunction with C<quote_char>, and is used to
498 specify the character that separates elements (schemas, tables, columns) from
499 each other. If unspecified it defaults to the most commonly used C<.>.
500
501 =item unsafe
502
503 This Storage driver normally installs its own C<HandleError>, sets
504 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
505 all database handles, including those supplied by a coderef.  It does this
506 so that it can have consistent and useful error behavior.
507
508 If you set this option to a true value, Storage will not do its usual
509 modifications to the database handle's attributes, and instead relies on
510 the settings in your connect_info DBI options (or the values you set in
511 your connection coderef, in the case that you are connecting via coderef).
512
513 Note that your custom settings can cause Storage to malfunction,
514 especially if you set a C<HandleError> handler that suppresses exceptions
515 and/or disable C<RaiseError>.
516
517 =item auto_savepoint
518
519 If this option is true, L<DBIx::Class> will use savepoints when nesting
520 transactions, making it possible to recover from failure in the inner
521 transaction without having to abort all outer transactions.
522
523 =item cursor_class
524
525 Use this argument to supply a cursor class other than the default
526 L<DBIx::Class::Storage::DBI::Cursor>.
527
528 =back
529
530 Some real-life examples of arguments to L</connect_info> and
531 L<DBIx::Class::Schema/connect>
532
533   # Simple SQLite connection
534   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
535
536   # Connect via subref
537   ->connect_info([ sub { DBI->connect(...) } ]);
538
539   # Connect via subref in hashref
540   ->connect_info([{
541     dbh_maker => sub { DBI->connect(...) },
542     on_connect_do => 'alter session ...',
543   }]);
544
545   # A bit more complicated
546   ->connect_info(
547     [
548       'dbi:Pg:dbname=foo',
549       'postgres',
550       'my_pg_password',
551       { AutoCommit => 1 },
552       { quote_char => q{"} },
553     ]
554   );
555
556   # Equivalent to the previous example
557   ->connect_info(
558     [
559       'dbi:Pg:dbname=foo',
560       'postgres',
561       'my_pg_password',
562       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
563     ]
564   );
565
566   # Same, but with hashref as argument
567   # See parse_connect_info for explanation
568   ->connect_info(
569     [{
570       dsn         => 'dbi:Pg:dbname=foo',
571       user        => 'postgres',
572       password    => 'my_pg_password',
573       AutoCommit  => 1,
574       quote_char  => q{"},
575       name_sep    => q{.},
576     }]
577   );
578
579   # Subref + DBIx::Class-specific connection options
580   ->connect_info(
581     [
582       sub { DBI->connect(...) },
583       {
584           quote_char => q{`},
585           name_sep => q{@},
586           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
587           disable_sth_caching => 1,
588       },
589     ]
590   );
591
592
593
594 =cut
595
596 sub connect_info {
597   my ($self, $info) = @_;
598
599   return $self->_connect_info if !$info;
600
601   $self->_connect_info($info); # copy for _connect_info
602
603   $info = $self->_normalize_connect_info($info)
604     if ref $info eq 'ARRAY';
605
606   for my $storage_opt (keys %{ $info->{storage_options} }) {
607     my $value = $info->{storage_options}{$storage_opt};
608
609     $self->$storage_opt($value);
610   }
611
612   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
613   #  the new set of options
614   $self->_sql_maker(undef);
615   $self->_sql_maker_opts({});
616
617   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
618     my $value = $info->{sql_maker_options}{$sql_maker_opt};
619
620     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
621   }
622
623   my %attrs = (
624     %{ $self->_default_dbi_connect_attributes || {} },
625     %{ $info->{attributes} || {} },
626   );
627
628   my @args = @{ $info->{arguments} };
629
630   if (keys %attrs and ref $args[0] ne 'CODE') {
631     carp
632         'You provided explicit AutoCommit => 0 in your connection_info. '
633       . 'This is almost universally a bad idea (see the footnotes of '
634       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
635       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
636       . 'this warning.'
637       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
638
639     push @args, \%attrs if keys %attrs;
640   }
641   $self->_dbi_connect_info(\@args);
642
643   # FIXME - dirty:
644   # save attributes them in a separate accessor so they are always
645   # introspectable, even in case of a CODE $dbhmaker
646   $self->_dbic_connect_attributes (\%attrs);
647
648   return $self->_connect_info;
649 }
650
651 sub _normalize_connect_info {
652   my ($self, $info_arg) = @_;
653   my %info;
654
655   my @args = @$info_arg;  # take a shallow copy for further mutilation
656
657   # combine/pre-parse arguments depending on invocation style
658
659   my %attrs;
660   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
661     %attrs = %{ $args[1] || {} };
662     @args = $args[0];
663   }
664   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
665     %attrs = %{$args[0]};
666     @args = ();
667     if (my $code = delete $attrs{dbh_maker}) {
668       @args = $code;
669
670       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
671       if (@ignored) {
672         carp sprintf (
673             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
674           . "to the result of 'dbh_maker'",
675
676           join (', ', map { "'$_'" } (@ignored) ),
677         );
678       }
679     }
680     else {
681       @args = delete @attrs{qw/dsn user password/};
682     }
683   }
684   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
685     %attrs = (
686       % { $args[3] || {} },
687       % { $args[4] || {} },
688     );
689     @args = @args[0,1,2];
690   }
691
692   $info{arguments} = \@args;
693
694   my @storage_opts = grep exists $attrs{$_},
695     @storage_options, 'cursor_class';
696
697   @{ $info{storage_options} }{@storage_opts} =
698     delete @attrs{@storage_opts} if @storage_opts;
699
700   my @sql_maker_opts = grep exists $attrs{$_},
701     qw/limit_dialect quote_char name_sep quote_names/;
702
703   @{ $info{sql_maker_options} }{@sql_maker_opts} =
704     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
705
706   $info{attributes} = \%attrs if %attrs;
707
708   return \%info;
709 }
710
711 sub _default_dbi_connect_attributes () {
712   +{
713     AutoCommit => 1,
714     PrintError => 0,
715     RaiseError => 1,
716     ShowErrorStatement => 1,
717   };
718 }
719
720 =head2 on_connect_do
721
722 This method is deprecated in favour of setting via L</connect_info>.
723
724 =cut
725
726 =head2 on_disconnect_do
727
728 This method is deprecated in favour of setting via L</connect_info>.
729
730 =cut
731
732 sub _parse_connect_do {
733   my ($self, $type) = @_;
734
735   my $val = $self->$type;
736   return () if not defined $val;
737
738   my @res;
739
740   if (not ref($val)) {
741     push @res, [ 'do_sql', $val ];
742   } elsif (ref($val) eq 'CODE') {
743     push @res, $val;
744   } elsif (ref($val) eq 'ARRAY') {
745     push @res, map { [ 'do_sql', $_ ] } @$val;
746   } else {
747     $self->throw_exception("Invalid type for $type: ".ref($val));
748   }
749
750   return \@res;
751 }
752
753 =head2 dbh_do
754
755 Arguments: ($subref | $method_name), @extra_coderef_args?
756
757 Execute the given $subref or $method_name using the new exception-based
758 connection management.
759
760 The first two arguments will be the storage object that C<dbh_do> was called
761 on and a database handle to use.  Any additional arguments will be passed
762 verbatim to the called subref as arguments 2 and onwards.
763
764 Using this (instead of $self->_dbh or $self->dbh) ensures correct
765 exception handling and reconnection (or failover in future subclasses).
766
767 Your subref should have no side-effects outside of the database, as
768 there is the potential for your subref to be partially double-executed
769 if the database connection was stale/dysfunctional.
770
771 Example:
772
773   my @stuff = $schema->storage->dbh_do(
774     sub {
775       my ($storage, $dbh, @cols) = @_;
776       my $cols = join(q{, }, @cols);
777       $dbh->selectrow_array("SELECT $cols FROM foo");
778     },
779     @column_list
780   );
781
782 =cut
783
784 sub dbh_do {
785   my $self = shift;
786   my $run_target = shift;
787
788   # short circuit when we know there is no need for a runner
789   #
790   # FIXME - asumption may be wrong
791   # the rationale for the txn_depth check is that if this block is a part
792   # of a larger transaction, everything up to that point is screwed anyway
793   return $self->$run_target($self->_get_dbh, @_)
794     if $self->{_in_do_block} or $self->transaction_depth;
795
796   my $cref = (ref $run_target eq 'CODE')
797     ? $run_target
798     : $self->can($run_target) || $self->throw_exception(sprintf (
799       'Can\'t locate object method "%s" via package "%s"',
800       $run_target,
801       (ref $self || $self),
802     ))
803   ;
804
805   # take a ref instead of a copy, to preserve @_ aliasing
806   # semantics within the coderef, but only if needed
807   # (pseudoforking doesn't like this trick much)
808   my $args = @_ ? \@_ : [];
809   unshift @$args, $self, $self->_get_dbh;
810
811   DBIx::Class::Storage::BlockRunner->new(
812     storage => $self,
813     run_code => $cref,
814     run_args => $args,
815     wrap_txn => 0,
816     retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) },
817   )->run;
818 }
819
820 sub txn_do {
821   $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
822   shift->next::method(@_);
823 }
824
825 =head2 disconnect
826
827 Our C<disconnect> method also performs a rollback first if the
828 database is not in C<AutoCommit> mode.
829
830 =cut
831
832 sub disconnect {
833   my ($self) = @_;
834
835   if( $self->_dbh ) {
836     my @actions;
837
838     push @actions, ( $self->on_disconnect_call || () );
839     push @actions, $self->_parse_connect_do ('on_disconnect_do');
840
841     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
842
843     # stops the "implicit rollback on disconnect" warning
844     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
845
846     %{ $self->_dbh->{CachedKids} } = ();
847     $self->_dbh->disconnect;
848     $self->_dbh(undef);
849     $self->{_dbh_gen}++;
850   }
851 }
852
853 =head2 with_deferred_fk_checks
854
855 =over 4
856
857 =item Arguments: C<$coderef>
858
859 =item Return Value: The return value of $coderef
860
861 =back
862
863 Storage specific method to run the code ref with FK checks deferred or
864 in MySQL's case disabled entirely.
865
866 =cut
867
868 # Storage subclasses should override this
869 sub with_deferred_fk_checks {
870   my ($self, $sub) = @_;
871   $sub->();
872 }
873
874 =head2 connected
875
876 =over
877
878 =item Arguments: none
879
880 =item Return Value: 1|0
881
882 =back
883
884 Verifies that the current database handle is active and ready to execute
885 an SQL statement (e.g. the connection did not get stale, server is still
886 answering, etc.) This method is used internally by L</dbh>.
887
888 =cut
889
890 sub connected {
891   my $self = shift;
892   return 0 unless $self->_seems_connected;
893
894   #be on the safe side
895   local $self->_dbh->{RaiseError} = 1;
896
897   return $self->_ping;
898 }
899
900 sub _seems_connected {
901   my $self = shift;
902
903   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
904
905   my $dbh = $self->_dbh
906     or return 0;
907
908   return $dbh->FETCH('Active');
909 }
910
911 sub _ping {
912   my $self = shift;
913
914   my $dbh = $self->_dbh or return 0;
915
916   return $dbh->ping;
917 }
918
919 sub ensure_connected {
920   my ($self) = @_;
921
922   unless ($self->connected) {
923     $self->_populate_dbh;
924   }
925 }
926
927 =head2 dbh
928
929 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
930 is guaranteed to be healthy by implicitly calling L</connected>, and if
931 necessary performing a reconnection before returning. Keep in mind that this
932 is very B<expensive> on some database engines. Consider using L</dbh_do>
933 instead.
934
935 =cut
936
937 sub dbh {
938   my ($self) = @_;
939
940   if (not $self->_dbh) {
941     $self->_populate_dbh;
942   } else {
943     $self->ensure_connected;
944   }
945   return $self->_dbh;
946 }
947
948 # this is the internal "get dbh or connect (don't check)" method
949 sub _get_dbh {
950   my $self = shift;
951   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
952   $self->_populate_dbh unless $self->_dbh;
953   return $self->_dbh;
954 }
955
956 sub sql_maker {
957   my ($self) = @_;
958   unless ($self->_sql_maker) {
959     my $sql_maker_class = $self->sql_maker_class;
960
961     my %opts = %{$self->_sql_maker_opts||{}};
962     my $dialect =
963       $opts{limit_dialect}
964         ||
965       $self->sql_limit_dialect
966         ||
967       do {
968         my $s_class = (ref $self) || $self;
969         carp (
970           "Your storage class ($s_class) does not set sql_limit_dialect and you "
971         . 'have not supplied an explicit limit_dialect in your connection_info. '
972         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
973         . 'databases but can be (and often is) painfully slow. '
974         . "Please file an RT ticket against '$s_class' ."
975         );
976
977         'GenericSubQ';
978       }
979     ;
980
981     my ($quote_char, $name_sep);
982
983     if ($opts{quote_names}) {
984       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
985         my $s_class = (ref $self) || $self;
986         carp (
987           "You requested 'quote_names' but your storage class ($s_class) does "
988         . 'not explicitly define a default sql_quote_char and you have not '
989         . 'supplied a quote_char as part of your connection_info. DBIC will '
990         .q{default to the ANSI SQL standard quote '"', which works most of }
991         . "the time. Please file an RT ticket against '$s_class'."
992         );
993
994         '"'; # RV
995       };
996
997       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
998     }
999
1000     $self->_sql_maker($sql_maker_class->new(
1001       bindtype=>'columns',
1002       array_datatypes => 1,
1003       limit_dialect => $dialect,
1004       ($quote_char ? (quote_char => $quote_char) : ()),
1005       name_sep => ($name_sep || '.'),
1006       %opts,
1007     ));
1008   }
1009   return $self->_sql_maker;
1010 }
1011
1012 # nothing to do by default
1013 sub _rebless {}
1014 sub _init {}
1015
1016 sub _populate_dbh {
1017   my ($self) = @_;
1018
1019   my @info = @{$self->_dbi_connect_info || []};
1020   $self->_dbh(undef); # in case ->connected failed we might get sent here
1021   $self->_dbh_details({}); # reset everything we know
1022
1023   $self->_dbh($self->_connect(@info));
1024
1025   $self->_conn_pid($$) unless DBIx::Class::_ENV_::BROKEN_FORK; # on win32 these are in fact threads
1026
1027   $self->_determine_driver;
1028
1029   # Always set the transaction depth on connect, since
1030   #  there is no transaction in progress by definition
1031   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1032
1033   $self->_run_connection_actions unless $self->{_in_determine_driver};
1034 }
1035
1036 sub _run_connection_actions {
1037   my $self = shift;
1038   my @actions;
1039
1040   push @actions, ( $self->on_connect_call || () );
1041   push @actions, $self->_parse_connect_do ('on_connect_do');
1042
1043   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1044 }
1045
1046
1047
1048 sub set_use_dbms_capability {
1049   $_[0]->set_inherited ($_[1], $_[2]);
1050 }
1051
1052 sub get_use_dbms_capability {
1053   my ($self, $capname) = @_;
1054
1055   my $use = $self->get_inherited ($capname);
1056   return defined $use
1057     ? $use
1058     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1059   ;
1060 }
1061
1062 sub set_dbms_capability {
1063   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1064 }
1065
1066 sub get_dbms_capability {
1067   my ($self, $capname) = @_;
1068
1069   my $cap = $self->_dbh_details->{capability}{$capname};
1070
1071   unless (defined $cap) {
1072     if (my $meth = $self->can ("_determine$capname")) {
1073       $cap = $self->$meth ? 1 : 0;
1074     }
1075     else {
1076       $cap = 0;
1077     }
1078
1079     $self->set_dbms_capability ($capname, $cap);
1080   }
1081
1082   return $cap;
1083 }
1084
1085 sub _server_info {
1086   my $self = shift;
1087
1088   my $info;
1089   unless ($info = $self->_dbh_details->{info}) {
1090
1091     $info = {};
1092
1093     my $server_version = try {
1094       $self->_get_server_version
1095     } catch {
1096       # driver determination *may* use this codepath
1097       # in which case we must rethrow
1098       $self->throw_exception($_) if $self->{_in_determine_driver};
1099
1100       # $server_version on failure
1101       undef;
1102     };
1103
1104     if (defined $server_version) {
1105       $info->{dbms_version} = $server_version;
1106
1107       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1108       my @verparts = split (/\./, $numeric_version);
1109       if (
1110         @verparts
1111           &&
1112         $verparts[0] <= 999
1113       ) {
1114         # consider only up to 3 version parts, iff not more than 3 digits
1115         my @use_parts;
1116         while (@verparts && @use_parts < 3) {
1117           my $p = shift @verparts;
1118           last if $p > 999;
1119           push @use_parts, $p;
1120         }
1121         push @use_parts, 0 while @use_parts < 3;
1122
1123         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1124       }
1125     }
1126
1127     $self->_dbh_details->{info} = $info;
1128   }
1129
1130   return $info;
1131 }
1132
1133 sub _get_server_version {
1134   shift->_dbh_get_info('SQL_DBMS_VER');
1135 }
1136
1137 sub _dbh_get_info {
1138   my ($self, $info) = @_;
1139
1140   if ($info =~ /[^0-9]/) {
1141     $info = $DBI::Const::GetInfoType::GetInfoType{$info};
1142     $self->throw_exception("Info type '$_[1]' not provided by DBI::Const::GetInfoType")
1143       unless defined $info;
1144   }
1145
1146   return $self->_get_dbh->get_info($info);
1147 }
1148
1149 sub _describe_connection {
1150   require DBI::Const::GetInfoReturn;
1151
1152   my $self = shift;
1153   $self->ensure_connected;
1154
1155   my $res = {
1156     DBIC_DSN => $self->_dbi_connect_info->[0],
1157     DBI_VER => DBI->VERSION,
1158     DBIC_VER => DBIx::Class->VERSION,
1159     DBIC_DRIVER => ref $self,
1160   };
1161
1162   for my $inf (
1163     #keys %DBI::Const::GetInfoType::GetInfoType,
1164     qw/
1165       SQL_CURSOR_COMMIT_BEHAVIOR
1166       SQL_CURSOR_ROLLBACK_BEHAVIOR
1167       SQL_CURSOR_SENSITIVITY
1168       SQL_DATA_SOURCE_NAME
1169       SQL_DBMS_NAME
1170       SQL_DBMS_VER
1171       SQL_DEFAULT_TXN_ISOLATION
1172       SQL_DM_VER
1173       SQL_DRIVER_NAME
1174       SQL_DRIVER_ODBC_VER
1175       SQL_DRIVER_VER
1176       SQL_EXPRESSIONS_IN_ORDERBY
1177       SQL_GROUP_BY
1178       SQL_IDENTIFIER_CASE
1179       SQL_IDENTIFIER_QUOTE_CHAR
1180       SQL_MAX_CATALOG_NAME_LEN
1181       SQL_MAX_COLUMN_NAME_LEN
1182       SQL_MAX_IDENTIFIER_LEN
1183       SQL_MAX_TABLE_NAME_LEN
1184       SQL_MULTIPLE_ACTIVE_TXN
1185       SQL_MULT_RESULT_SETS
1186       SQL_NEED_LONG_DATA_LEN
1187       SQL_NON_NULLABLE_COLUMNS
1188       SQL_ODBC_VER
1189       SQL_QUALIFIER_NAME_SEPARATOR
1190       SQL_QUOTED_IDENTIFIER_CASE
1191       SQL_TXN_CAPABLE
1192       SQL_TXN_ISOLATION_OPTION
1193     /
1194   ) {
1195     my $v = $self->_dbh_get_info($inf);
1196     next unless defined $v;
1197
1198     #my $key = sprintf( '%s(%s)', $inf, $DBI::Const::GetInfoType::GetInfoType{$inf} );
1199     my $expl = DBI::Const::GetInfoReturn::Explain($inf, $v);
1200     $res->{$inf} = DBI::Const::GetInfoReturn::Format($inf, $v) . ( $expl ? " ($expl)" : '' );
1201   }
1202
1203   $res;
1204 }
1205
1206 sub _determine_driver {
1207   my ($self) = @_;
1208
1209   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1210     my $started_connected = 0;
1211     local $self->{_in_determine_driver} = 1;
1212
1213     if (ref($self) eq __PACKAGE__) {
1214       my $driver;
1215       if ($self->_dbh) { # we are connected
1216         $driver = $self->_dbh->{Driver}{Name};
1217         $started_connected = 1;
1218       }
1219       else {
1220         # if connect_info is a CODEREF, we have no choice but to connect
1221         if (ref $self->_dbi_connect_info->[0] &&
1222             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1223           $self->_populate_dbh;
1224           $driver = $self->_dbh->{Driver}{Name};
1225         }
1226         else {
1227           # try to use dsn to not require being connected, the driver may still
1228           # force a connection in _rebless to determine version
1229           # (dsn may not be supplied at all if all we do is make a mock-schema)
1230           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1231           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1232           $driver ||= $ENV{DBI_DRIVER};
1233         }
1234       }
1235
1236       if ($driver) {
1237         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1238         if ($self->load_optional_class($storage_class)) {
1239           mro::set_mro($storage_class, 'c3');
1240           bless $self, $storage_class;
1241           $self->_rebless();
1242         }
1243         else {
1244           $self->_warn_undetermined_driver(
1245             'This version of DBIC does not yet seem to supply a driver for '
1246           . "your particular RDBMS and/or connection method ('$driver')."
1247           );
1248         }
1249       }
1250       else {
1251         $self->_warn_undetermined_driver(
1252           'Unable to extract a driver name from connect info - this '
1253         . 'should not have happened.'
1254         );
1255       }
1256     }
1257
1258     $self->_driver_determined(1);
1259
1260     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1261
1262     if ($self->can('source_bind_attributes')) {
1263       $self->throw_exception(
1264         "Your storage subclass @{[ ref $self ]} provides (or inherits) the method "
1265       . 'source_bind_attributes() for which support has been removed as of Jan 2013. '
1266       . 'If you are not sure how to proceed please contact the development team via '
1267       . 'http://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT'
1268       );
1269     }
1270
1271     $self->_init; # run driver-specific initializations
1272
1273     $self->_run_connection_actions
1274         if !$started_connected && defined $self->_dbh;
1275   }
1276 }
1277
1278 sub _determine_connector_driver {
1279   my ($self, $conn) = @_;
1280
1281   my $dbtype = $self->_dbh_get_info('SQL_DBMS_NAME');
1282
1283   if (not $dbtype) {
1284     $self->_warn_undetermined_driver(
1285       'Unable to retrieve RDBMS type (SQL_DBMS_NAME) of the engine behind your '
1286     . "$conn connector - this should not have happened."
1287     );
1288     return;
1289   }
1290
1291   $dbtype =~ s/\W/_/gi;
1292
1293   my $subclass = "DBIx::Class::Storage::DBI::${conn}::${dbtype}";
1294   return if $self->isa($subclass);
1295
1296   if ($self->load_optional_class($subclass)) {
1297     bless $self, $subclass;
1298     $self->_rebless;
1299   }
1300   else {
1301     $self->_warn_undetermined_driver(
1302       'This version of DBIC does not yet seem to supply a driver for '
1303     . "your particular RDBMS and/or connection method ('$conn/$dbtype')."
1304     );
1305   }
1306 }
1307
1308 sub _warn_undetermined_driver {
1309   my ($self, $msg) = @_;
1310
1311   require Data::Dumper::Concise;
1312
1313   carp_once ($msg . ' While we will attempt to continue anyway, the results '
1314   . 'are likely to be underwhelming. Please upgrade DBIC, and if this message '
1315   . "does not go away, file a bugreport including the following info:\n"
1316   . Data::Dumper::Concise::Dumper($self->_describe_connection)
1317   );
1318 }
1319
1320 sub _do_connection_actions {
1321   my $self          = shift;
1322   my $method_prefix = shift;
1323   my $call          = shift;
1324
1325   if (not ref($call)) {
1326     my $method = $method_prefix . $call;
1327     $self->$method(@_);
1328   } elsif (ref($call) eq 'CODE') {
1329     $self->$call(@_);
1330   } elsif (ref($call) eq 'ARRAY') {
1331     if (ref($call->[0]) ne 'ARRAY') {
1332       $self->_do_connection_actions($method_prefix, $_) for @$call;
1333     } else {
1334       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1335     }
1336   } else {
1337     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1338   }
1339
1340   return $self;
1341 }
1342
1343 sub connect_call_do_sql {
1344   my $self = shift;
1345   $self->_do_query(@_);
1346 }
1347
1348 sub disconnect_call_do_sql {
1349   my $self = shift;
1350   $self->_do_query(@_);
1351 }
1352
1353 # override in db-specific backend when necessary
1354 sub connect_call_datetime_setup { 1 }
1355
1356 sub _do_query {
1357   my ($self, $action) = @_;
1358
1359   if (ref $action eq 'CODE') {
1360     $action = $action->($self);
1361     $self->_do_query($_) foreach @$action;
1362   }
1363   else {
1364     # Most debuggers expect ($sql, @bind), so we need to exclude
1365     # the attribute hash which is the second argument to $dbh->do
1366     # furthermore the bind values are usually to be presented
1367     # as named arrayref pairs, so wrap those here too
1368     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1369     my $sql = shift @do_args;
1370     my $attrs = shift @do_args;
1371     my @bind = map { [ undef, $_ ] } @do_args;
1372
1373     $self->dbh_do(sub {
1374       $_[0]->_query_start($sql, \@bind);
1375       $_[1]->do($sql, $attrs, @do_args);
1376       $_[0]->_query_end($sql, \@bind);
1377     });
1378   }
1379
1380   return $self;
1381 }
1382
1383 sub _connect {
1384   my ($self, @info) = @_;
1385
1386   $self->throw_exception("You failed to provide any connection info")
1387     if !@info;
1388
1389   my ($old_connect_via, $dbh);
1390
1391   local $DBI::connect_via = 'connect' if $INC{'Apache/DBI.pm'} && $ENV{MOD_PERL};
1392
1393   try {
1394     if(ref $info[0] eq 'CODE') {
1395       $dbh = $info[0]->();
1396     }
1397     else {
1398       require DBI;
1399       $dbh = DBI->connect(@info);
1400     }
1401
1402     die $DBI::errstr unless $dbh;
1403
1404     die sprintf ("%s fresh DBI handle with a *false* 'Active' attribute. "
1405       . 'This handle is disconnected as far as DBIC is concerned, and we can '
1406       . 'not continue',
1407       ref $info[0] eq 'CODE'
1408         ? "Connection coderef $info[0] returned a"
1409         : 'DBI->connect($schema->storage->connect_info) resulted in a'
1410     ) unless $dbh->FETCH('Active');
1411
1412     # sanity checks unless asked otherwise
1413     unless ($self->unsafe) {
1414
1415       $self->throw_exception(
1416         'Refusing clobbering of {HandleError} installed on externally supplied '
1417        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1418       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1419
1420       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1421       # request, or an external handle. Complain and set anyway
1422       unless ($dbh->{RaiseError}) {
1423         carp( ref $info[0] eq 'CODE'
1424
1425           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1426            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1427            .'attribute has been supplied'
1428
1429           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1430            .'unsafe => 1. Toggling RaiseError back to true'
1431         );
1432
1433         $dbh->{RaiseError} = 1;
1434       }
1435
1436       # this odd anonymous coderef dereference is in fact really
1437       # necessary to avoid the unwanted effect described in perl5
1438       # RT#75792
1439       sub {
1440         my $weak_self = $_[0];
1441         weaken $weak_self;
1442
1443         # the coderef is blessed so we can distinguish it from externally
1444         # supplied handles (which must be preserved)
1445         $_[1]->{HandleError} = bless sub {
1446           if ($weak_self) {
1447             $weak_self->throw_exception("DBI Exception: $_[0]");
1448           }
1449           else {
1450             # the handler may be invoked by something totally out of
1451             # the scope of DBIC
1452             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1453           }
1454         }, '__DBIC__DBH__ERROR__HANDLER__';
1455       }->($self, $dbh);
1456     }
1457   }
1458   catch {
1459     $self->throw_exception("DBI Connection failed: $_")
1460   };
1461
1462   $self->_dbh_autocommit($dbh->{AutoCommit});
1463   $dbh;
1464 }
1465
1466 sub txn_begin {
1467   my $self = shift;
1468
1469   # this means we have not yet connected and do not know the AC status
1470   # (e.g. coderef $dbh), need a full-fledged connection check
1471   if (! defined $self->_dbh_autocommit) {
1472     $self->ensure_connected;
1473   }
1474   # Otherwise simply connect or re-connect on pid changes
1475   else {
1476     $self->_get_dbh;
1477   }
1478
1479   $self->next::method(@_);
1480 }
1481
1482 sub _exec_txn_begin {
1483   my $self = shift;
1484
1485   # if the user is utilizing txn_do - good for him, otherwise we need to
1486   # ensure that the $dbh is healthy on BEGIN.
1487   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1488   # will be replaced by a failure of begin_work itself (which will be
1489   # then retried on reconnect)
1490   if ($self->{_in_do_block}) {
1491     $self->_dbh->begin_work;
1492   } else {
1493     $self->dbh_do(sub { $_[1]->begin_work });
1494   }
1495 }
1496
1497 sub txn_commit {
1498   my $self = shift;
1499
1500   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1501   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1502     unless $self->_dbh;
1503
1504   # esoteric case for folks using external $dbh handles
1505   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1506     carp "Storage transaction_depth 0 does not match "
1507         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1508     $self->transaction_depth(1);
1509   }
1510
1511   $self->next::method(@_);
1512
1513   # if AutoCommit is disabled txn_depth never goes to 0
1514   # as a new txn is started immediately on commit
1515   $self->transaction_depth(1) if (
1516     !$self->transaction_depth
1517       and
1518     defined $self->_dbh_autocommit
1519       and
1520     ! $self->_dbh_autocommit
1521   );
1522 }
1523
1524 sub _exec_txn_commit {
1525   shift->_dbh->commit;
1526 }
1527
1528 sub txn_rollback {
1529   my $self = shift;
1530
1531   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1532   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1533     unless $self->_dbh;
1534
1535   # esoteric case for folks using external $dbh handles
1536   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1537     carp "Storage transaction_depth 0 does not match "
1538         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1539     $self->transaction_depth(1);
1540   }
1541
1542   $self->next::method(@_);
1543
1544   # if AutoCommit is disabled txn_depth never goes to 0
1545   # as a new txn is started immediately on commit
1546   $self->transaction_depth(1) if (
1547     !$self->transaction_depth
1548       and
1549     defined $self->_dbh_autocommit
1550       and
1551     ! $self->_dbh_autocommit
1552   );
1553 }
1554
1555 sub _exec_txn_rollback {
1556   shift->_dbh->rollback;
1557 }
1558
1559 # generate some identical methods
1560 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1561   no strict qw/refs/;
1562   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1563     my $self = shift;
1564     $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1565     $self->throw_exception("Unable to $meth() on a disconnected storage")
1566       unless $self->_dbh;
1567     $self->next::method(@_);
1568   };
1569 }
1570
1571 # This used to be the top-half of _execute.  It was split out to make it
1572 #  easier to override in NoBindVars without duping the rest.  It takes up
1573 #  all of _execute's args, and emits $sql, @bind.
1574 sub _prep_for_execute {
1575   #my ($self, $op, $ident, $args) = @_;
1576   return shift->_gen_sql_bind(@_)
1577 }
1578
1579 sub _gen_sql_bind {
1580   my ($self, $op, $ident, $args) = @_;
1581
1582   my ($colinfos, $from);
1583   if ( blessed($ident) ) {
1584     $from = $ident->from;
1585     $colinfos = $ident->columns_info;
1586   }
1587
1588   my ($sql, @bind) = $self->sql_maker->$op( ($from || $ident), @$args );
1589
1590   if (
1591     ! $ENV{DBIC_DT_SEARCH_OK}
1592       and
1593     $op eq 'select'
1594       and
1595     first { blessed($_->[1]) && $_->[1]->isa('DateTime') } @bind
1596   ) {
1597     carp_unique 'DateTime objects passed to search() are not supported '
1598       . 'properly (InflateColumn::DateTime formats and settings are not '
1599       . 'respected.) See "Formatting DateTime objects in queries" in '
1600       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1601       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1602   }
1603
1604   return( $sql, $self->_resolve_bindattrs(
1605     $ident, [ @{$args->[2]{bind}||[]}, @bind ], $colinfos
1606   ));
1607 }
1608
1609 sub _resolve_bindattrs {
1610   my ($self, $ident, $bind, $colinfos) = @_;
1611
1612   $colinfos ||= {};
1613
1614   my $resolve_bindinfo = sub {
1615     #my $infohash = shift;
1616
1617     %$colinfos = %{ $self->_resolve_column_info($ident) }
1618       unless keys %$colinfos;
1619
1620     my $ret;
1621     if (my $col = $_[0]->{dbic_colname}) {
1622       $ret = { %{$_[0]} };
1623
1624       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1625         if $colinfos->{$col}{data_type};
1626
1627       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1628         if $colinfos->{$col}{size};
1629     }
1630
1631     $ret || $_[0];
1632   };
1633
1634   return [ map {
1635     if (ref $_ ne 'ARRAY') {
1636       [{}, $_]
1637     }
1638     elsif (! defined $_->[0]) {
1639       [{}, $_->[1]]
1640     }
1641     elsif (ref $_->[0] eq 'HASH') {
1642       [
1643         ($_->[0]{dbd_attrs} or $_->[0]{sqlt_datatype}) ? $_->[0] : $resolve_bindinfo->($_->[0]),
1644         $_->[1]
1645       ]
1646     }
1647     elsif (ref $_->[0] eq 'SCALAR') {
1648       [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1649     }
1650     else {
1651       [ $resolve_bindinfo->({ dbic_colname => $_->[0] }), $_->[1] ]
1652     }
1653   } @$bind ];
1654 }
1655
1656 sub _format_for_trace {
1657   #my ($self, $bind) = @_;
1658
1659   ### Turn @bind from something like this:
1660   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1661   ### to this:
1662   ###   ( "'1'", "'3'" )
1663
1664   map {
1665     defined( $_ && $_->[1] )
1666       ? qq{'$_->[1]'}
1667       : q{NULL}
1668   } @{$_[1] || []};
1669 }
1670
1671 sub _query_start {
1672   my ( $self, $sql, $bind ) = @_;
1673
1674   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1675     if $self->debug;
1676 }
1677
1678 sub _query_end {
1679   my ( $self, $sql, $bind ) = @_;
1680
1681   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1682     if $self->debug;
1683 }
1684
1685 sub _dbi_attrs_for_bind {
1686   my ($self, $ident, $bind) = @_;
1687
1688   my @attrs;
1689
1690   for (map { $_->[0] } @$bind) {
1691     push @attrs, do {
1692       if (exists $_->{dbd_attrs}) {
1693         $_->{dbd_attrs}
1694       }
1695       elsif($_->{sqlt_datatype}) {
1696         # cache the result in the dbh_details hash, as it can not change unless
1697         # we connect to something else
1698         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1699         if (not exists $cache->{$_->{sqlt_datatype}}) {
1700           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1701         }
1702         $cache->{$_->{sqlt_datatype}};
1703       }
1704       else {
1705         undef;  # always push something at this position
1706       }
1707     }
1708   }
1709
1710   return \@attrs;
1711 }
1712
1713 sub _execute {
1714   my ($self, $op, $ident, @args) = @_;
1715
1716   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1717
1718   shift->dbh_do(    # retry over disconnects
1719     '_dbh_execute',
1720     $sql,
1721     $bind,
1722     $ident,
1723   );
1724 }
1725
1726 sub _dbh_execute {
1727   my ($self, undef, $sql, $bind, $ident) = @_;
1728
1729   $self->_query_start( $sql, $bind );
1730
1731   my $bind_attrs = $self->_dbi_attrs_for_bind($ident, $bind);
1732
1733   my $sth = $self->_sth($sql);
1734
1735   for my $i (0 .. $#$bind) {
1736     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1737       $sth->bind_param_inout(
1738         $i + 1, # bind params counts are 1-based
1739         $bind->[$i][1],
1740         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1741         $bind_attrs->[$i],
1742       );
1743     }
1744     else {
1745       $sth->bind_param(
1746         $i + 1,
1747         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1748           ? "$bind->[$i][1]"
1749           : $bind->[$i][1]
1750         ,
1751         $bind_attrs->[$i],
1752       );
1753     }
1754   }
1755
1756   # Can this fail without throwing an exception anyways???
1757   my $rv = $sth->execute();
1758   $self->throw_exception(
1759     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1760   ) if !$rv;
1761
1762   $self->_query_end( $sql, $bind );
1763
1764   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1765 }
1766
1767 sub _prefetch_autovalues {
1768   my ($self, $source, $colinfo, $to_insert) = @_;
1769
1770   my %values;
1771   for my $col (keys %$colinfo) {
1772     if (
1773       $colinfo->{$col}{auto_nextval}
1774         and
1775       (
1776         ! exists $to_insert->{$col}
1777           or
1778         ref $to_insert->{$col} eq 'SCALAR'
1779           or
1780         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1781       )
1782     ) {
1783       $values{$col} = $self->_sequence_fetch(
1784         'NEXTVAL',
1785         ( $colinfo->{$col}{sequence} ||=
1786             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1787         ),
1788       );
1789     }
1790   }
1791
1792   \%values;
1793 }
1794
1795 sub insert {
1796   my ($self, $source, $to_insert) = @_;
1797
1798   my $col_infos = $source->columns_info;
1799
1800   my $prefetched_values = $self->_prefetch_autovalues($source, $col_infos, $to_insert);
1801
1802   # fuse the values, but keep a separate list of prefetched_values so that
1803   # they can be fused once again with the final return
1804   $to_insert = { %$to_insert, %$prefetched_values };
1805
1806   # FIXME - we seem to assume undef values as non-supplied. This is wrong.
1807   # Investigate what does it take to s/defined/exists/
1808   my %pcols = map { $_ => 1 } $source->primary_columns;
1809   my (%retrieve_cols, $autoinc_supplied, $retrieve_autoinc_col);
1810   for my $col ($source->columns) {
1811     if ($col_infos->{$col}{is_auto_increment}) {
1812       $autoinc_supplied ||= 1 if defined $to_insert->{$col};
1813       $retrieve_autoinc_col ||= $col unless $autoinc_supplied;
1814     }
1815
1816     # nothing to retrieve when explicit values are supplied
1817     next if (defined $to_insert->{$col} and ! (
1818       ref $to_insert->{$col} eq 'SCALAR'
1819         or
1820       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1821     ));
1822
1823     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1824     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1825       $pcols{$col}
1826         or
1827       $col_infos->{$col}{retrieve_on_insert}
1828     );
1829   };
1830
1831   local $self->{_autoinc_supplied_for_op} = $autoinc_supplied;
1832   local $self->{_perform_autoinc_retrieval} = $retrieve_autoinc_col;
1833
1834   my ($sqla_opts, @ir_container);
1835   if (%retrieve_cols and $self->_use_insert_returning) {
1836     $sqla_opts->{returning_container} = \@ir_container
1837       if $self->_use_insert_returning_bound;
1838
1839     $sqla_opts->{returning} = [
1840       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1841     ];
1842   }
1843
1844   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1845
1846   my %returned_cols = %$to_insert;
1847   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1848     @ir_container = try {
1849       local $SIG{__WARN__} = sub {};
1850       my @r = $sth->fetchrow_array;
1851       $sth->finish;
1852       @r;
1853     } unless @ir_container;
1854
1855     @returned_cols{@$retlist} = @ir_container if @ir_container;
1856   }
1857   else {
1858     # pull in PK if needed and then everything else
1859     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1860
1861       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1862         unless $self->can('last_insert_id');
1863
1864       my @pri_values = $self->last_insert_id($source, @missing_pri);
1865
1866       $self->throw_exception( "Can't get last insert id" )
1867         unless (@pri_values == @missing_pri);
1868
1869       @returned_cols{@missing_pri} = @pri_values;
1870       delete $retrieve_cols{$_} for @missing_pri;
1871     }
1872
1873     # if there is more left to pull
1874     if (%retrieve_cols) {
1875       $self->throw_exception(
1876         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1877       ) unless %pcols;
1878
1879       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1880
1881       my $cur = DBIx::Class::ResultSet->new($source, {
1882         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1883         select => \@left_to_fetch,
1884       })->cursor;
1885
1886       @returned_cols{@left_to_fetch} = $cur->next;
1887
1888       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1889         if scalar $cur->next;
1890     }
1891   }
1892
1893   return { %$prefetched_values, %returned_cols };
1894 }
1895
1896 sub insert_bulk {
1897   my ($self, $source, $cols, $data) = @_;
1898
1899   my @col_range = (0..$#$cols);
1900
1901   # FIXME - perhaps this is not even needed? does DBI stringify?
1902   #
1903   # forcibly stringify whatever is stringifiable
1904   # ResultSet::populate() hands us a copy - safe to mangle
1905   for my $r (0 .. $#$data) {
1906     for my $c (0 .. $#{$data->[$r]}) {
1907       $data->[$r][$c] = "$data->[$r][$c]"
1908         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1909     }
1910   }
1911
1912   my $colinfos = $source->columns_info($cols);
1913
1914   local $self->{_autoinc_supplied_for_op} =
1915     (first { $_->{is_auto_increment} } values %$colinfos)
1916       ? 1
1917       : 0
1918   ;
1919
1920   # get a slice type index based on first row of data
1921   # a "column" in this context may refer to more than one bind value
1922   # e.g. \[ '?, ?', [...], [...] ]
1923   #
1924   # construct the value type index - a description of values types for every
1925   # per-column slice of $data:
1926   #
1927   # nonexistent - nonbind literal
1928   # 0 - regular value
1929   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
1930   #
1931   # also construct the column hash to pass to the SQL generator. For plain
1932   # (non literal) values - convert the members of the first row into a
1933   # literal+bind combo, with extra positional info in the bind attr hashref.
1934   # This will allow us to match the order properly, and is so contrived
1935   # because a user-supplied literal/bind (or something else specific to a
1936   # resultsource and/or storage driver) can inject extra binds along the
1937   # way, so one can't rely on "shift positions" ordering at all. Also we
1938   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
1939   # can be later matched up by address), because we want to supply a real
1940   # value on which perhaps e.g. datatype checks will be performed
1941   my ($proto_data, $value_type_by_col_idx);
1942   for my $i (@col_range) {
1943     my $colname = $cols->[$i];
1944     if (ref $data->[0][$i] eq 'SCALAR') {
1945       # no bind value at all - no type
1946
1947       $proto_data->{$colname} = $data->[0][$i];
1948     }
1949     elsif (ref $data->[0][$i] eq 'REF' and ref ${$data->[0][$i]} eq 'ARRAY' ) {
1950       # repack, so we don't end up mangling the original \[]
1951       my ($sql, @bind) = @${$data->[0][$i]};
1952
1953       # normalization of user supplied stuff
1954       my $resolved_bind = $self->_resolve_bindattrs(
1955         $source, \@bind, $colinfos,
1956       );
1957
1958       # store value-less (attrs only) bind info - we will be comparing all
1959       # supplied binds against this for sanity
1960       $value_type_by_col_idx->{$i} = [ map { $_->[0] } @$resolved_bind ];
1961
1962       $proto_data->{$colname} = \[ $sql, map { [
1963         # inject slice order to use for $proto_bind construction
1964           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $i, _literal_bind_subindex => $_+1 }
1965             =>
1966           $resolved_bind->[$_][1]
1967         ] } (0 .. $#bind)
1968       ];
1969     }
1970     else {
1971       $value_type_by_col_idx->{$i} = undef;
1972
1973       $proto_data->{$colname} = \[ '?', [
1974         { dbic_colname => $colname, _bind_data_slice_idx => $i }
1975           =>
1976         $data->[0][$i]
1977       ] ];
1978     }
1979   }
1980
1981   my ($sql, $proto_bind) = $self->_prep_for_execute (
1982     'insert',
1983     $source,
1984     [ $proto_data ],
1985   );
1986
1987   if (! @$proto_bind and keys %$value_type_by_col_idx) {
1988     # if the bindlist is empty and we had some dynamic binds, this means the
1989     # storage ate them away (e.g. the NoBindVars component) and interpolated
1990     # them directly into the SQL. This obviously can't be good for multi-inserts
1991     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1992   }
1993
1994   # sanity checks
1995   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
1996   #
1997   # use an error reporting closure for convenience (less to pass)
1998   my $bad_slice_report_cref = sub {
1999     my ($msg, $r_idx, $c_idx) = @_;
2000     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
2001       $msg,
2002       $cols->[$c_idx],
2003       do {
2004         require Data::Dumper::Concise;
2005         local $Data::Dumper::Maxdepth = 5;
2006         Data::Dumper::Concise::Dumper ({
2007           map { $cols->[$_] =>
2008             $data->[$r_idx][$_]
2009           } @col_range
2010         }),
2011       }
2012     );
2013   };
2014
2015   for my $col_idx (@col_range) {
2016     my $reference_val = $data->[0][$col_idx];
2017
2018     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
2019       my $val = $data->[$row_idx][$col_idx];
2020
2021       if (! exists $value_type_by_col_idx->{$col_idx}) { # literal no binds
2022         if (ref $val ne 'SCALAR') {
2023           $bad_slice_report_cref->(
2024             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
2025             $row_idx,
2026             $col_idx,
2027           );
2028         }
2029         elsif ($$val ne $$reference_val) {
2030           $bad_slice_report_cref->(
2031             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
2032             $row_idx,
2033             $col_idx,
2034           );
2035         }
2036       }
2037       elsif (! defined $value_type_by_col_idx->{$col_idx} ) {  # regular non-literal value
2038         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
2039           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
2040         }
2041       }
2042       else {  # binds from a \[], compare type and attrs
2043         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
2044           $bad_slice_report_cref->(
2045             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
2046             $row_idx,
2047             $col_idx,
2048           );
2049         }
2050         # start drilling down and bail out early on identical refs
2051         elsif (
2052           $reference_val != $val
2053             or
2054           $$reference_val != $$val
2055         ) {
2056           if (${$val}->[0] ne ${$reference_val}->[0]) {
2057             $bad_slice_report_cref->(
2058               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
2059               $row_idx,
2060               $col_idx,
2061             );
2062           }
2063           # need to check the bind attrs - a bind will happen only once for
2064           # the entire dataset, so any changes further down will be ignored.
2065           elsif (! Data::Compare::Compare(
2066             $value_type_by_col_idx->{$col_idx},
2067             [
2068               map
2069               { $_->[0] }
2070               @{$self->_resolve_bindattrs(
2071                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
2072               )}
2073             ],
2074           )) {
2075             $bad_slice_report_cref->(
2076               'Differing bind attributes on literal/bind values not supported',
2077               $row_idx,
2078               $col_idx,
2079             );
2080           }
2081         }
2082       }
2083     }
2084   }
2085
2086   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
2087   # are atomic (even if execute_for_fetch is a single call). Thus a safety
2088   # scope guard
2089   my $guard = $self->txn_scope_guard;
2090
2091   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
2092   my $sth = $self->_sth($sql);
2093   my $rv = do {
2094     if (@$proto_bind) {
2095       # proto bind contains the information on which pieces of $data to pull
2096       # $cols is passed in only for prettier error-reporting
2097       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
2098     }
2099     else {
2100       # bind_param_array doesn't work if there are no binds
2101       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
2102     }
2103   };
2104
2105   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
2106
2107   $guard->commit;
2108
2109   return wantarray ? ($rv, $sth, @$proto_bind) : $rv;
2110 }
2111
2112 # execute_for_fetch is capable of returning data just fine (it means it
2113 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
2114 # is the void-populate fast-path we will just ignore this altogether
2115 # for the time being.
2116 sub _dbh_execute_for_fetch {
2117   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
2118
2119   my @idx_range = ( 0 .. $#$proto_bind );
2120
2121   # If we have any bind attributes to take care of, we will bind the
2122   # proto-bind data (which will never be used by execute_for_fetch)
2123   # However since column bindtypes are "sticky", this is sufficient
2124   # to get the DBD to apply the bindtype to all values later on
2125
2126   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
2127
2128   for my $i (@idx_range) {
2129     $sth->bind_param (
2130       $i+1, # DBI bind indexes are 1-based
2131       $proto_bind->[$i][1],
2132       $bind_attrs->[$i],
2133     ) if defined $bind_attrs->[$i];
2134   }
2135
2136   # At this point $data slots named in the _bind_data_slice_idx of
2137   # each piece of $proto_bind are either \[]s or plain values to be
2138   # passed in. Construct the dispensing coderef. *NOTE* the order
2139   # of $data will differ from this of the ?s in the SQL (due to
2140   # alphabetical ordering by colname). We actually do want to
2141   # preserve this behavior so that prepare_cached has a better
2142   # chance of matching on unrelated calls
2143
2144   my $fetch_row_idx = -1; # saner loop this way
2145   my $fetch_tuple = sub {
2146     return undef if ++$fetch_row_idx > $#$data;
2147
2148     return [ map { defined $_->{_literal_bind_subindex}
2149       ? ${ $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]}
2150          ->[ $_->{_literal_bind_subindex} ]
2151           ->[1]
2152       : $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]
2153     } map { $_->[0] } @$proto_bind];
2154   };
2155
2156   my $tuple_status = [];
2157   my ($rv, $err);
2158   try {
2159     $rv = $sth->execute_for_fetch(
2160       $fetch_tuple,
2161       $tuple_status,
2162     );
2163   }
2164   catch {
2165     $err = shift;
2166   };
2167
2168   # Not all DBDs are create equal. Some throw on error, some return
2169   # an undef $rv, and some set $sth->err - try whatever we can
2170   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2171     ! defined $err
2172       and
2173     ( !defined $rv or $sth->err )
2174   );
2175
2176   # Statement must finish even if there was an exception.
2177   try {
2178     $sth->finish
2179   }
2180   catch {
2181     $err = shift unless defined $err
2182   };
2183
2184   if (defined $err) {
2185     my $i = 0;
2186     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2187
2188     $self->throw_exception("Unexpected populate error: $err")
2189       if ($i > $#$tuple_status);
2190
2191     require Data::Dumper::Concise;
2192     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2193       ($tuple_status->[$i][1] || $err),
2194       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2195     );
2196   }
2197
2198   return $rv;
2199 }
2200
2201 sub _dbh_execute_inserts_with_no_binds {
2202   my ($self, $sth, $count) = @_;
2203
2204   my $err;
2205   try {
2206     my $dbh = $self->_get_dbh;
2207     local $dbh->{RaiseError} = 1;
2208     local $dbh->{PrintError} = 0;
2209
2210     $sth->execute foreach 1..$count;
2211   }
2212   catch {
2213     $err = shift;
2214   };
2215
2216   # Make sure statement is finished even if there was an exception.
2217   try {
2218     $sth->finish
2219   }
2220   catch {
2221     $err = shift unless defined $err;
2222   };
2223
2224   $self->throw_exception($err) if defined $err;
2225
2226   return $count;
2227 }
2228
2229 sub update {
2230   #my ($self, $source, @args) = @_;
2231   shift->_execute('update', @_);
2232 }
2233
2234
2235 sub delete {
2236   #my ($self, $source, @args) = @_;
2237   shift->_execute('delete', @_);
2238 }
2239
2240 sub _select {
2241   my $self = shift;
2242   $self->_execute($self->_select_args(@_));
2243 }
2244
2245 sub _select_args_to_query {
2246   my $self = shift;
2247
2248   $self->throw_exception(
2249     "Unable to generate limited query representation with 'software_limit' enabled"
2250   ) if ($_[3]->{software_limit} and ($_[3]->{offset} or $_[3]->{rows}) );
2251
2252   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2253   #  = $self->_select_args($ident, $select, $cond, $attrs);
2254   my ($op, $ident, @args) =
2255     $self->_select_args(@_);
2256
2257   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2258   my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, \@args);
2259   $prepared_bind ||= [];
2260
2261   return wantarray
2262     ? ($sql, $prepared_bind)
2263     : \[ "($sql)", @$prepared_bind ]
2264   ;
2265 }
2266
2267 sub _select_args {
2268   my ($self, $ident, $select, $where, $attrs) = @_;
2269
2270   my $sql_maker = $self->sql_maker;
2271   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2272
2273   $attrs = {
2274     %$attrs,
2275     select => $select,
2276     from => $ident,
2277     where => $where,
2278     $rs_alias && $alias2source->{$rs_alias}
2279       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2280       : ()
2281     ,
2282   };
2283
2284   # Sanity check the attributes (SQLMaker does it too, but
2285   # in case of a software_limit we'll never reach there)
2286   if (defined $attrs->{offset}) {
2287     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2288       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2289   }
2290
2291   if (defined $attrs->{rows}) {
2292     $self->throw_exception("The rows attribute must be a positive integer if present")
2293       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2294   }
2295   elsif ($attrs->{offset}) {
2296     # MySQL actually recommends this approach.  I cringe.
2297     $attrs->{rows} = $sql_maker->__max_int;
2298   }
2299
2300   my @limit;
2301
2302   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2303   # storage, unless software limit was requested
2304   if (
2305     #limited has_many
2306     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2307        ||
2308     # grouped prefetch (to satisfy group_by == select)
2309     ( $attrs->{group_by}
2310         &&
2311       @{$attrs->{group_by}}
2312         &&
2313       $attrs->{_prefetch_selector_range}
2314     )
2315   ) {
2316     ($ident, $select, $where, $attrs)
2317       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2318   }
2319   elsif (! $attrs->{software_limit} ) {
2320     push @limit, (
2321       $attrs->{rows} || (),
2322       $attrs->{offset} || (),
2323     );
2324   }
2325
2326   # try to simplify the joinmap further (prune unreferenced type-single joins)
2327   if (
2328     ref $ident
2329       and
2330     reftype $ident eq 'ARRAY'
2331       and
2332     @$ident != 1
2333   ) {
2334     $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2335   }
2336
2337 ###
2338   # This would be the point to deflate anything found in $where
2339   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2340   # expect a result object. And all we have is a resultsource (it is trivial
2341   # to extract deflator coderefs via $alias2source above).
2342   #
2343   # I don't see a way forward other than changing the way deflators are
2344   # invoked, and that's just bad...
2345 ###
2346
2347   return ('select', $ident, $select, $where, $attrs, @limit);
2348 }
2349
2350 # Returns a counting SELECT for a simple count
2351 # query. Abstracted so that a storage could override
2352 # this to { count => 'firstcol' } or whatever makes
2353 # sense as a performance optimization
2354 sub _count_select {
2355   #my ($self, $source, $rs_attrs) = @_;
2356   return { count => '*' };
2357 }
2358
2359 =head2 select
2360
2361 =over 4
2362
2363 =item Arguments: $ident, $select, $condition, $attrs
2364
2365 =back
2366
2367 Handle a SQL select statement.
2368
2369 =cut
2370
2371 sub select {
2372   my $self = shift;
2373   my ($ident, $select, $condition, $attrs) = @_;
2374   return $self->cursor_class->new($self, \@_, $attrs);
2375 }
2376
2377 sub select_single {
2378   my $self = shift;
2379   my ($rv, $sth, @bind) = $self->_select(@_);
2380   my @row = $sth->fetchrow_array;
2381   my @nextrow = $sth->fetchrow_array if @row;
2382   if(@row && @nextrow) {
2383     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2384   }
2385   # Need to call finish() to work round broken DBDs
2386   $sth->finish();
2387   return @row;
2388 }
2389
2390 =head2 sql_limit_dialect
2391
2392 This is an accessor for the default SQL limit dialect used by a particular
2393 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2394 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2395 see L<DBIx::Class::SQLMaker::LimitDialects>.
2396
2397 =cut
2398
2399 sub _dbh_sth {
2400   my ($self, $dbh, $sql) = @_;
2401
2402   # 3 is the if_active parameter which avoids active sth re-use
2403   my $sth = $self->disable_sth_caching
2404     ? $dbh->prepare($sql)
2405     : $dbh->prepare_cached($sql, {}, 3);
2406
2407   # XXX You would think RaiseError would make this impossible,
2408   #  but apparently that's not true :(
2409   $self->throw_exception(
2410     $dbh->errstr
2411       ||
2412     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2413             .'an exception and/or setting $dbh->errstr',
2414       length ($sql) > 20
2415         ? substr($sql, 0, 20) . '...'
2416         : $sql
2417       ,
2418       'DBD::' . $dbh->{Driver}{Name},
2419     )
2420   ) if !$sth;
2421
2422   $sth;
2423 }
2424
2425 sub sth {
2426   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2427   shift->_sth(@_);
2428 }
2429
2430 sub _sth {
2431   my ($self, $sql) = @_;
2432   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2433 }
2434
2435 sub _dbh_columns_info_for {
2436   my ($self, $dbh, $table) = @_;
2437
2438   if ($dbh->can('column_info')) {
2439     my %result;
2440     my $caught;
2441     try {
2442       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2443       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2444       $sth->execute();
2445       while ( my $info = $sth->fetchrow_hashref() ){
2446         my %column_info;
2447         $column_info{data_type}   = $info->{TYPE_NAME};
2448         $column_info{size}      = $info->{COLUMN_SIZE};
2449         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2450         $column_info{default_value} = $info->{COLUMN_DEF};
2451         my $col_name = $info->{COLUMN_NAME};
2452         $col_name =~ s/^\"(.*)\"$/$1/;
2453
2454         $result{$col_name} = \%column_info;
2455       }
2456     } catch {
2457       $caught = 1;
2458     };
2459     return \%result if !$caught && scalar keys %result;
2460   }
2461
2462   my %result;
2463   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2464   $sth->execute;
2465   my @columns = @{$sth->{NAME_lc}};
2466   for my $i ( 0 .. $#columns ){
2467     my %column_info;
2468     $column_info{data_type} = $sth->{TYPE}->[$i];
2469     $column_info{size} = $sth->{PRECISION}->[$i];
2470     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2471
2472     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2473       $column_info{data_type} = $1;
2474       $column_info{size}    = $2;
2475     }
2476
2477     $result{$columns[$i]} = \%column_info;
2478   }
2479   $sth->finish;
2480
2481   foreach my $col (keys %result) {
2482     my $colinfo = $result{$col};
2483     my $type_num = $colinfo->{data_type};
2484     my $type_name;
2485     if(defined $type_num && $dbh->can('type_info')) {
2486       my $type_info = $dbh->type_info($type_num);
2487       $type_name = $type_info->{TYPE_NAME} if $type_info;
2488       $colinfo->{data_type} = $type_name if $type_name;
2489     }
2490   }
2491
2492   return \%result;
2493 }
2494
2495 sub columns_info_for {
2496   my ($self, $table) = @_;
2497   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2498 }
2499
2500 =head2 last_insert_id
2501
2502 Return the row id of the last insert.
2503
2504 =cut
2505
2506 sub _dbh_last_insert_id {
2507     my ($self, $dbh, $source, $col) = @_;
2508
2509     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2510
2511     return $id if defined $id;
2512
2513     my $class = ref $self;
2514     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2515 }
2516
2517 sub last_insert_id {
2518   my $self = shift;
2519   $self->_dbh_last_insert_id ($self->_dbh, @_);
2520 }
2521
2522 =head2 _native_data_type
2523
2524 =over 4
2525
2526 =item Arguments: $type_name
2527
2528 =back
2529
2530 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2531 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2532 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2533
2534 The default implementation returns C<undef>, implement in your Storage driver if
2535 you need this functionality.
2536
2537 Should map types from other databases to the native RDBMS type, for example
2538 C<VARCHAR2> to C<VARCHAR>.
2539
2540 Types with modifiers should map to the underlying data type. For example,
2541 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2542
2543 Composite types should map to the container type, for example
2544 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2545
2546 =cut
2547
2548 sub _native_data_type {
2549   #my ($self, $data_type) = @_;
2550   return undef
2551 }
2552
2553 # Check if placeholders are supported at all
2554 sub _determine_supports_placeholders {
2555   my $self = shift;
2556   my $dbh  = $self->_get_dbh;
2557
2558   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2559   # but it is inaccurate more often than not
2560   return try {
2561     local $dbh->{PrintError} = 0;
2562     local $dbh->{RaiseError} = 1;
2563     $dbh->do('select ?', {}, 1);
2564     1;
2565   }
2566   catch {
2567     0;
2568   };
2569 }
2570
2571 # Check if placeholders bound to non-string types throw exceptions
2572 #
2573 sub _determine_supports_typeless_placeholders {
2574   my $self = shift;
2575   my $dbh  = $self->_get_dbh;
2576
2577   return try {
2578     local $dbh->{PrintError} = 0;
2579     local $dbh->{RaiseError} = 1;
2580     # this specifically tests a bind that is NOT a string
2581     $dbh->do('select 1 where 1 = ?', {}, 1);
2582     1;
2583   }
2584   catch {
2585     0;
2586   };
2587 }
2588
2589 =head2 sqlt_type
2590
2591 Returns the database driver name.
2592
2593 =cut
2594
2595 sub sqlt_type {
2596   shift->_get_dbh->{Driver}->{Name};
2597 }
2598
2599 =head2 bind_attribute_by_data_type
2600
2601 Given a datatype from column info, returns a database specific bind
2602 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2603 let the database planner just handle it.
2604
2605 This method is always called after the driver has been determined and a DBI
2606 connection has been established. Therefore you can refer to C<DBI::$constant>
2607 and/or C<DBD::$driver::$constant> directly, without worrying about loading
2608 the correct modules.
2609
2610 =cut
2611
2612 sub bind_attribute_by_data_type {
2613     return;
2614 }
2615
2616 =head2 is_datatype_numeric
2617
2618 Given a datatype from column_info, returns a boolean value indicating if
2619 the current RDBMS considers it a numeric value. This controls how
2620 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2621 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2622 be performed instead of the usual C<eq>.
2623
2624 =cut
2625
2626 sub is_datatype_numeric {
2627   #my ($self, $dt) = @_;
2628
2629   return 0 unless $_[1];
2630
2631   $_[1] =~ /^ (?:
2632     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2633   ) $/ix;
2634 }
2635
2636
2637 =head2 create_ddl_dir
2638
2639 =over 4
2640
2641 =item Arguments: $schema, \@databases, $version, $directory, $preversion, \%sqlt_args
2642
2643 =back
2644
2645 Creates a SQL file based on the Schema, for each of the specified
2646 database engines in C<\@databases> in the given directory.
2647 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2648
2649 Given a previous version number, this will also create a file containing
2650 the ALTER TABLE statements to transform the previous schema into the
2651 current one. Note that these statements may contain C<DROP TABLE> or
2652 C<DROP COLUMN> statements that can potentially destroy data.
2653
2654 The file names are created using the C<ddl_filename> method below, please
2655 override this method in your schema if you would like a different file
2656 name format. For the ALTER file, the same format is used, replacing
2657 $version in the name with "$preversion-$version".
2658
2659 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2660 The most common value for this would be C<< { add_drop_table => 1 } >>
2661 to have the SQL produced include a C<DROP TABLE> statement for each table
2662 created. For quoting purposes supply C<quote_table_names> and
2663 C<quote_field_names>.
2664
2665 If no arguments are passed, then the following default values are assumed:
2666
2667 =over 4
2668
2669 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2670
2671 =item version    - $schema->schema_version
2672
2673 =item directory  - './'
2674
2675 =item preversion - <none>
2676
2677 =back
2678
2679 By default, C<\%sqlt_args> will have
2680
2681  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2682
2683 merged with the hash passed in. To disable any of those features, pass in a
2684 hashref like the following
2685
2686  { ignore_constraint_names => 0, # ... other options }
2687
2688
2689 WARNING: You are strongly advised to check all SQL files created, before applying
2690 them.
2691
2692 =cut
2693
2694 sub create_ddl_dir {
2695   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2696
2697   unless ($dir) {
2698     carp "No directory given, using ./\n";
2699     $dir = './';
2700   } else {
2701       -d $dir
2702         or
2703       (require File::Path and File::Path::mkpath (["$dir"]))  # mkpath does not like objects (i.e. Path::Class::Dir)
2704         or
2705       $self->throw_exception(
2706         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2707       );
2708   }
2709
2710   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2711
2712   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2713   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2714
2715   my $schema_version = $schema->schema_version || '1.x';
2716   $version ||= $schema_version;
2717
2718   $sqltargs = {
2719     add_drop_table => 1,
2720     ignore_constraint_names => 1,
2721     ignore_index_names => 1,
2722     %{$sqltargs || {}}
2723   };
2724
2725   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2726     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2727   }
2728
2729   my $sqlt = SQL::Translator->new( $sqltargs );
2730
2731   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2732   my $sqlt_schema = $sqlt->translate({ data => $schema })
2733     or $self->throw_exception ($sqlt->error);
2734
2735   foreach my $db (@$databases) {
2736     $sqlt->reset();
2737     $sqlt->{schema} = $sqlt_schema;
2738     $sqlt->producer($db);
2739
2740     my $file;
2741     my $filename = $schema->ddl_filename($db, $version, $dir);
2742     if (-e $filename && ($version eq $schema_version )) {
2743       # if we are dumping the current version, overwrite the DDL
2744       carp "Overwriting existing DDL file - $filename";
2745       unlink($filename);
2746     }
2747
2748     my $output = $sqlt->translate;
2749     if(!$output) {
2750       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2751       next;
2752     }
2753     if(!open($file, ">$filename")) {
2754       $self->throw_exception("Can't open $filename for writing ($!)");
2755       next;
2756     }
2757     print $file $output;
2758     close($file);
2759
2760     next unless ($preversion);
2761
2762     require SQL::Translator::Diff;
2763
2764     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2765     if(!-e $prefilename) {
2766       carp("No previous schema file found ($prefilename)");
2767       next;
2768     }
2769
2770     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2771     if(-e $difffile) {
2772       carp("Overwriting existing diff file - $difffile");
2773       unlink($difffile);
2774     }
2775
2776     my $source_schema;
2777     {
2778       my $t = SQL::Translator->new($sqltargs);
2779       $t->debug( 0 );
2780       $t->trace( 0 );
2781
2782       $t->parser( $db )
2783         or $self->throw_exception ($t->error);
2784
2785       my $out = $t->translate( $prefilename )
2786         or $self->throw_exception ($t->error);
2787
2788       $source_schema = $t->schema;
2789
2790       $source_schema->name( $prefilename )
2791         unless ( $source_schema->name );
2792     }
2793
2794     # The "new" style of producers have sane normalization and can support
2795     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2796     # And we have to diff parsed SQL against parsed SQL.
2797     my $dest_schema = $sqlt_schema;
2798
2799     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2800       my $t = SQL::Translator->new($sqltargs);
2801       $t->debug( 0 );
2802       $t->trace( 0 );
2803
2804       $t->parser( $db )
2805         or $self->throw_exception ($t->error);
2806
2807       my $out = $t->translate( $filename )
2808         or $self->throw_exception ($t->error);
2809
2810       $dest_schema = $t->schema;
2811
2812       $dest_schema->name( $filename )
2813         unless $dest_schema->name;
2814     }
2815
2816     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2817                                                   $dest_schema,   $db,
2818                                                   $sqltargs
2819                                                  );
2820     if(!open $file, ">$difffile") {
2821       $self->throw_exception("Can't write to $difffile ($!)");
2822       next;
2823     }
2824     print $file $diff;
2825     close($file);
2826   }
2827 }
2828
2829 =head2 deployment_statements
2830
2831 =over 4
2832
2833 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2834
2835 =back
2836
2837 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2838
2839 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2840 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2841
2842 C<$directory> is used to return statements from files in a previously created
2843 L</create_ddl_dir> directory and is optional. The filenames are constructed
2844 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2845
2846 If no C<$directory> is specified then the statements are constructed on the
2847 fly using L<SQL::Translator> and C<$version> is ignored.
2848
2849 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2850
2851 =cut
2852
2853 sub deployment_statements {
2854   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2855   $type ||= $self->sqlt_type;
2856   $version ||= $schema->schema_version || '1.x';
2857   $dir ||= './';
2858   my $filename = $schema->ddl_filename($type, $version, $dir);
2859   if(-f $filename)
2860   {
2861       # FIXME replace this block when a proper sane sql parser is available
2862       my $file;
2863       open($file, "<$filename")
2864         or $self->throw_exception("Can't open $filename ($!)");
2865       my @rows = <$file>;
2866       close($file);
2867       return join('', @rows);
2868   }
2869
2870   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2871     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2872   }
2873
2874   # sources needs to be a parser arg, but for simplicty allow at top level
2875   # coming in
2876   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2877       if exists $sqltargs->{sources};
2878
2879   my $tr = SQL::Translator->new(
2880     producer => "SQL::Translator::Producer::${type}",
2881     %$sqltargs,
2882     parser => 'SQL::Translator::Parser::DBIx::Class',
2883     data => $schema,
2884   );
2885
2886   return preserve_context {
2887     $tr->translate
2888   } after => sub {
2889     $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2890       unless defined $_[0];
2891   };
2892 }
2893
2894 # FIXME deploy() currently does not accurately report sql errors
2895 # Will always return true while errors are warned
2896 sub deploy {
2897   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2898   my $deploy = sub {
2899     my $line = shift;
2900     return if(!$line);
2901     return if($line =~ /^--/);
2902     # next if($line =~ /^DROP/m);
2903     return if($line =~ /^BEGIN TRANSACTION/m);
2904     return if($line =~ /^COMMIT/m);
2905     return if $line =~ /^\s+$/; # skip whitespace only
2906     $self->_query_start($line);
2907     try {
2908       # do a dbh_do cycle here, as we need some error checking in
2909       # place (even though we will ignore errors)
2910       $self->dbh_do (sub { $_[1]->do($line) });
2911     } catch {
2912       carp qq{$_ (running "${line}")};
2913     };
2914     $self->_query_end($line);
2915   };
2916   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2917   if (@statements > 1) {
2918     foreach my $statement (@statements) {
2919       $deploy->( $statement );
2920     }
2921   }
2922   elsif (@statements == 1) {
2923     # split on single line comments and end of statements
2924     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2925       $deploy->( $line );
2926     }
2927   }
2928 }
2929
2930 =head2 datetime_parser
2931
2932 Returns the datetime parser class
2933
2934 =cut
2935
2936 sub datetime_parser {
2937   my $self = shift;
2938   return $self->{datetime_parser} ||= do {
2939     $self->build_datetime_parser(@_);
2940   };
2941 }
2942
2943 =head2 datetime_parser_type
2944
2945 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2946
2947 =head2 build_datetime_parser
2948
2949 See L</datetime_parser>
2950
2951 =cut
2952
2953 sub build_datetime_parser {
2954   my $self = shift;
2955   my $type = $self->datetime_parser_type(@_);
2956   return $type;
2957 }
2958
2959
2960 =head2 is_replicating
2961
2962 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2963 replicate from a master database.  Default is undef, which is the result
2964 returned by databases that don't support replication.
2965
2966 =cut
2967
2968 sub is_replicating {
2969     return;
2970
2971 }
2972
2973 =head2 lag_behind_master
2974
2975 Returns a number that represents a certain amount of lag behind a master db
2976 when a given storage is replicating.  The number is database dependent, but
2977 starts at zero and increases with the amount of lag. Default in undef
2978
2979 =cut
2980
2981 sub lag_behind_master {
2982     return;
2983 }
2984
2985 =head2 relname_to_table_alias
2986
2987 =over 4
2988
2989 =item Arguments: $relname, $join_count
2990
2991 =item Return Value: $alias
2992
2993 =back
2994
2995 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2996 queries.
2997
2998 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2999 way these aliases are named.
3000
3001 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
3002 otherwise C<"$relname">.
3003
3004 =cut
3005
3006 sub relname_to_table_alias {
3007   my ($self, $relname, $join_count) = @_;
3008
3009   my $alias = ($join_count && $join_count > 1 ?
3010     join('_', $relname, $join_count) : $relname);
3011
3012   return $alias;
3013 }
3014
3015 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
3016 # version and it may be necessary to amend or override it for a specific storage
3017 # if such binds are necessary.
3018 sub _max_column_bytesize {
3019   my ($self, $attr) = @_;
3020
3021   my $max_size;
3022
3023   if ($attr->{sqlt_datatype}) {
3024     my $data_type = lc($attr->{sqlt_datatype});
3025
3026     if ($attr->{sqlt_size}) {
3027
3028       # String/sized-binary types
3029       if ($data_type =~ /^(?:
3030           l? (?:var)? char(?:acter)? (?:\s*varying)?
3031             |
3032           (?:var)? binary (?:\s*varying)?
3033             |
3034           raw
3035         )\b/x
3036       ) {
3037         $max_size = $attr->{sqlt_size};
3038       }
3039       # Other charset/unicode types, assume scale of 4
3040       elsif ($data_type =~ /^(?:
3041           national \s* character (?:\s*varying)?
3042             |
3043           nchar
3044             |
3045           univarchar
3046             |
3047           nvarchar
3048         )\b/x
3049       ) {
3050         $max_size = $attr->{sqlt_size} * 4;
3051       }
3052     }
3053
3054     if (!$max_size and !$self->_is_lob_type($data_type)) {
3055       $max_size = 100 # for all other (numeric?) datatypes
3056     }
3057   }
3058
3059   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
3060 }
3061
3062 # Determine if a data_type is some type of BLOB
3063 sub _is_lob_type {
3064   my ($self, $data_type) = @_;
3065   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
3066     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
3067                                   |varchar|character\s*varying|nvarchar
3068                                   |national\s*character\s*varying))?\z/xi);
3069 }
3070
3071 sub _is_binary_lob_type {
3072   my ($self, $data_type) = @_;
3073   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
3074     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
3075 }
3076
3077 sub _is_text_lob_type {
3078   my ($self, $data_type) = @_;
3079   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
3080     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
3081                         |national\s*character\s*varying))\z/xi);
3082 }
3083
3084 # Determine if a data_type is some type of a binary type
3085 sub _is_binary_type {
3086   my ($self, $data_type) = @_;
3087   $data_type && ($self->_is_binary_lob_type($data_type)
3088     || $data_type =~ /(?:var)?(?:binary|bit|graphic)(?:\s*varying)?/i);
3089 }
3090
3091 1;
3092
3093 =head1 USAGE NOTES
3094
3095 =head2 DBIx::Class and AutoCommit
3096
3097 DBIx::Class can do some wonderful magic with handling exceptions,
3098 disconnections, and transactions when you use C<< AutoCommit => 1 >>
3099 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
3100 transaction support.
3101
3102 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
3103 in an assumed transaction between commits, and you're telling us you'd
3104 like to manage that manually.  A lot of the magic protections offered by
3105 this module will go away.  We can't protect you from exceptions due to database
3106 disconnects because we don't know anything about how to restart your
3107 transactions.  You're on your own for handling all sorts of exceptional
3108 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
3109 be with raw DBI.
3110
3111
3112 =head1 AUTHOR AND CONTRIBUTORS
3113
3114 See L<AUTHOR|DBIx::Class/AUTHOR> and L<CONTRIBUTORS|DBIx::Class/CONTRIBUTORS> in DBIx::Class
3115
3116 =head1 LICENSE
3117
3118 You may distribute this code under the same terms as Perl itself.
3119
3120 =cut