Sanify _determine_driver handling in ::Storage::DBI
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17
18 # what version of sqlt do we require if deploy() without a ddl_dir is invoked
19 # when changing also adjust the corresponding author_require in Makefile.PL
20 my $minimum_sqlt_version = '0.11002';
21
22
23 __PACKAGE__->mk_group_accessors('simple' =>
24   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
25      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
26 );
27
28 # the values for these accessors are picked out (and deleted) from
29 # the attribute hashref passed to connect_info
30 my @storage_options = qw/
31   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
32   disable_sth_caching unsafe auto_savepoint
33 /;
34 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
35
36
37 # default cursor class, overridable in connect_info attributes
38 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
39
40 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
41 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
42
43
44 # Each of these methods need _determine_driver called before itself
45 # in order to function reliably. This is a purely DRY optimization
46 my @rdbms_specific_methods = qw/
47   sqlt_type
48   build_datetime_parser
49   datetime_parser_type
50
51   insert
52   insert_bulk
53   update
54   delete
55   select
56   select_single
57 /;
58
59 for my $meth (@rdbms_specific_methods) {
60
61   my $orig = __PACKAGE__->can ($meth)
62     or next;
63
64   no strict qw/refs/;
65   no warnings qw/redefine/;
66   *{__PACKAGE__ ."::$meth"} = sub {
67     if (not $_[0]->_driver_determined) {
68       $_[0]->_determine_driver;
69       goto $_[0]->can($meth);
70     }
71     $orig->(@_);
72   };
73 }
74
75
76 =head1 NAME
77
78 DBIx::Class::Storage::DBI - DBI storage handler
79
80 =head1 SYNOPSIS
81
82   my $schema = MySchema->connect('dbi:SQLite:my.db');
83
84   $schema->storage->debug(1);
85
86   my @stuff = $schema->storage->dbh_do(
87     sub {
88       my ($storage, $dbh, @args) = @_;
89       $dbh->do("DROP TABLE authors");
90     },
91     @column_list
92   );
93
94   $schema->resultset('Book')->search({
95      written_on => $schema->storage->datetime_parser(DateTime->now)
96   });
97
98 =head1 DESCRIPTION
99
100 This class represents the connection to an RDBMS via L<DBI>.  See
101 L<DBIx::Class::Storage> for general information.  This pod only
102 documents DBI-specific methods and behaviors.
103
104 =head1 METHODS
105
106 =cut
107
108 sub new {
109   my $new = shift->next::method(@_);
110
111   $new->transaction_depth(0);
112   $new->_sql_maker_opts({});
113   $new->{savepoints} = [];
114   $new->{_in_dbh_do} = 0;
115   $new->{_dbh_gen} = 0;
116
117   $new;
118 }
119
120 =head2 connect_info
121
122 This method is normally called by L<DBIx::Class::Schema/connection>, which
123 encapsulates its argument list in an arrayref before passing them here.
124
125 The argument list may contain:
126
127 =over
128
129 =item *
130
131 The same 4-element argument set one would normally pass to
132 L<DBI/connect>, optionally followed by
133 L<extra attributes|/DBIx::Class specific connection attributes>
134 recognized by DBIx::Class:
135
136   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
137
138 =item *
139
140 A single code reference which returns a connected
141 L<DBI database handle|DBI/connect> optionally followed by
142 L<extra attributes|/DBIx::Class specific connection attributes> recognized
143 by DBIx::Class:
144
145   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
146
147 =item *
148
149 A single hashref with all the attributes and the dsn/user/password
150 mixed together:
151
152   $connect_info_args = [{
153     dsn => $dsn,
154     user => $user,
155     password => $pass,
156     %dbi_attributes,
157     %extra_attributes,
158   }];
159
160   $connect_info_args = [{
161     dbh_maker => sub { DBI->connect (...) },
162     %dbi_attributes,
163     %extra_attributes,
164   }];
165
166 This is particularly useful for L<Catalyst> based applications, allowing the
167 following config (L<Config::General> style):
168
169   <Model::DB>
170     schema_class   App::DB
171     <connect_info>
172       dsn          dbi:mysql:database=test
173       user         testuser
174       password     TestPass
175       AutoCommit   1
176     </connect_info>
177   </Model::DB>
178
179 The C<dsn>/C<user>/C<password> combination can be substituted by the
180 C<dbh_maker> key whose value is a coderef that returns a connected
181 L<DBI database handle|DBI/connect>
182
183 =back
184
185 Please note that the L<DBI> docs recommend that you always explicitly
186 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
187 recommends that it be set to I<1>, and that you perform transactions
188 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
189 to I<1> if you do not do explicitly set it to zero.  This is the default
190 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
191
192 =head3 DBIx::Class specific connection attributes
193
194 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
195 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
196 the following connection options. These options can be mixed in with your other
197 L<DBI> connection attributes, or placed in a seperate hashref
198 (C<\%extra_attributes>) as shown above.
199
200 Every time C<connect_info> is invoked, any previous settings for
201 these options will be cleared before setting the new ones, regardless of
202 whether any options are specified in the new C<connect_info>.
203
204
205 =over
206
207 =item on_connect_do
208
209 Specifies things to do immediately after connecting or re-connecting to
210 the database.  Its value may contain:
211
212 =over
213
214 =item a scalar
215
216 This contains one SQL statement to execute.
217
218 =item an array reference
219
220 This contains SQL statements to execute in order.  Each element contains
221 a string or a code reference that returns a string.
222
223 =item a code reference
224
225 This contains some code to execute.  Unlike code references within an
226 array reference, its return value is ignored.
227
228 =back
229
230 =item on_disconnect_do
231
232 Takes arguments in the same form as L</on_connect_do> and executes them
233 immediately before disconnecting from the database.
234
235 Note, this only runs if you explicitly call L</disconnect> on the
236 storage object.
237
238 =item on_connect_call
239
240 A more generalized form of L</on_connect_do> that calls the specified
241 C<connect_call_METHOD> methods in your storage driver.
242
243   on_connect_do => 'select 1'
244
245 is equivalent to:
246
247   on_connect_call => [ [ do_sql => 'select 1' ] ]
248
249 Its values may contain:
250
251 =over
252
253 =item a scalar
254
255 Will call the C<connect_call_METHOD> method.
256
257 =item a code reference
258
259 Will execute C<< $code->($storage) >>
260
261 =item an array reference
262
263 Each value can be a method name or code reference.
264
265 =item an array of arrays
266
267 For each array, the first item is taken to be the C<connect_call_> method name
268 or code reference, and the rest are parameters to it.
269
270 =back
271
272 Some predefined storage methods you may use:
273
274 =over
275
276 =item do_sql
277
278 Executes a SQL string or a code reference that returns a SQL string. This is
279 what L</on_connect_do> and L</on_disconnect_do> use.
280
281 It can take:
282
283 =over
284
285 =item a scalar
286
287 Will execute the scalar as SQL.
288
289 =item an arrayref
290
291 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
292 attributes hashref and bind values.
293
294 =item a code reference
295
296 Will execute C<< $code->($storage) >> and execute the return array refs as
297 above.
298
299 =back
300
301 =item datetime_setup
302
303 Execute any statements necessary to initialize the database session to return
304 and accept datetime/timestamp values used with
305 L<DBIx::Class::InflateColumn::DateTime>.
306
307 Only necessary for some databases, see your specific storage driver for
308 implementation details.
309
310 =back
311
312 =item on_disconnect_call
313
314 Takes arguments in the same form as L</on_connect_call> and executes them
315 immediately before disconnecting from the database.
316
317 Calls the C<disconnect_call_METHOD> methods as opposed to the
318 C<connect_call_METHOD> methods called by L</on_connect_call>.
319
320 Note, this only runs if you explicitly call L</disconnect> on the
321 storage object.
322
323 =item disable_sth_caching
324
325 If set to a true value, this option will disable the caching of
326 statement handles via L<DBI/prepare_cached>.
327
328 =item limit_dialect
329
330 Sets the limit dialect. This is useful for JDBC-bridge among others
331 where the remote SQL-dialect cannot be determined by the name of the
332 driver alone. See also L<SQL::Abstract::Limit>.
333
334 =item quote_char
335
336 Specifies what characters to use to quote table and column names. If
337 you use this you will want to specify L</name_sep> as well.
338
339 C<quote_char> expects either a single character, in which case is it
340 is placed on either side of the table/column name, or an arrayref of length
341 2 in which case the table/column name is placed between the elements.
342
343 For example under MySQL you should use C<< quote_char => '`' >>, and for
344 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
345
346 =item name_sep
347
348 This only needs to be used in conjunction with C<quote_char>, and is used to
349 specify the charecter that seperates elements (schemas, tables, columns) from
350 each other. In most cases this is simply a C<.>.
351
352 The consequences of not supplying this value is that L<SQL::Abstract>
353 will assume DBIx::Class' uses of aliases to be complete column
354 names. The output will look like I<"me.name"> when it should actually
355 be I<"me"."name">.
356
357 =item unsafe
358
359 This Storage driver normally installs its own C<HandleError>, sets
360 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
361 all database handles, including those supplied by a coderef.  It does this
362 so that it can have consistent and useful error behavior.
363
364 If you set this option to a true value, Storage will not do its usual
365 modifications to the database handle's attributes, and instead relies on
366 the settings in your connect_info DBI options (or the values you set in
367 your connection coderef, in the case that you are connecting via coderef).
368
369 Note that your custom settings can cause Storage to malfunction,
370 especially if you set a C<HandleError> handler that suppresses exceptions
371 and/or disable C<RaiseError>.
372
373 =item auto_savepoint
374
375 If this option is true, L<DBIx::Class> will use savepoints when nesting
376 transactions, making it possible to recover from failure in the inner
377 transaction without having to abort all outer transactions.
378
379 =item cursor_class
380
381 Use this argument to supply a cursor class other than the default
382 L<DBIx::Class::Storage::DBI::Cursor>.
383
384 =back
385
386 Some real-life examples of arguments to L</connect_info> and
387 L<DBIx::Class::Schema/connect>
388
389   # Simple SQLite connection
390   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
391
392   # Connect via subref
393   ->connect_info([ sub { DBI->connect(...) } ]);
394
395   # Connect via subref in hashref
396   ->connect_info([{
397     dbh_maker => sub { DBI->connect(...) },
398     on_connect_do => 'alter session ...',
399   }]);
400
401   # A bit more complicated
402   ->connect_info(
403     [
404       'dbi:Pg:dbname=foo',
405       'postgres',
406       'my_pg_password',
407       { AutoCommit => 1 },
408       { quote_char => q{"}, name_sep => q{.} },
409     ]
410   );
411
412   # Equivalent to the previous example
413   ->connect_info(
414     [
415       'dbi:Pg:dbname=foo',
416       'postgres',
417       'my_pg_password',
418       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
419     ]
420   );
421
422   # Same, but with hashref as argument
423   # See parse_connect_info for explanation
424   ->connect_info(
425     [{
426       dsn         => 'dbi:Pg:dbname=foo',
427       user        => 'postgres',
428       password    => 'my_pg_password',
429       AutoCommit  => 1,
430       quote_char  => q{"},
431       name_sep    => q{.},
432     }]
433   );
434
435   # Subref + DBIx::Class-specific connection options
436   ->connect_info(
437     [
438       sub { DBI->connect(...) },
439       {
440           quote_char => q{`},
441           name_sep => q{@},
442           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
443           disable_sth_caching => 1,
444       },
445     ]
446   );
447
448
449
450 =cut
451
452 sub connect_info {
453   my ($self, $info_arg) = @_;
454
455   return $self->_connect_info if !$info_arg;
456
457   my @args = @$info_arg;  # take a shallow copy for further mutilation
458   $self->_connect_info([@args]); # copy for _connect_info
459
460
461   # combine/pre-parse arguments depending on invocation style
462
463   my %attrs;
464   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
465     %attrs = %{ $args[1] || {} };
466     @args = $args[0];
467   }
468   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
469     %attrs = %{$args[0]};
470     @args = ();
471     if (my $code = delete $attrs{dbh_maker}) {
472       @args = $code;
473
474       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
475       if (@ignored) {
476         carp sprintf (
477             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
478           . "to the result of 'dbh_maker'",
479
480           join (', ', map { "'$_'" } (@ignored) ),
481         );
482       }
483     }
484     else {
485       @args = delete @attrs{qw/dsn user password/};
486     }
487   }
488   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
489     %attrs = (
490       % { $args[3] || {} },
491       % { $args[4] || {} },
492     );
493     @args = @args[0,1,2];
494   }
495
496   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
497   #  the new set of options
498   $self->_sql_maker(undef);
499   $self->_sql_maker_opts({});
500
501   if(keys %attrs) {
502     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
503       if(my $value = delete $attrs{$storage_opt}) {
504         $self->$storage_opt($value);
505       }
506     }
507     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
508       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
509         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
510       }
511     }
512   }
513
514   if (ref $args[0] eq 'CODE') {
515     # _connect() never looks past $args[0] in this case
516     %attrs = ()
517   } else {
518     %attrs = (
519       %{ $self->_default_dbi_connect_attributes || {} },
520       %attrs,
521     );
522   }
523
524   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
525   $self->_connect_info;
526 }
527
528 sub _default_dbi_connect_attributes {
529   return {
530     AutoCommit => 1,
531     RaiseError => 1,
532     PrintError => 0,
533   };
534 }
535
536 =head2 on_connect_do
537
538 This method is deprecated in favour of setting via L</connect_info>.
539
540 =cut
541
542 =head2 on_disconnect_do
543
544 This method is deprecated in favour of setting via L</connect_info>.
545
546 =cut
547
548 sub _parse_connect_do {
549   my ($self, $type) = @_;
550
551   my $val = $self->$type;
552   return () if not defined $val;
553
554   my @res;
555
556   if (not ref($val)) {
557     push @res, [ 'do_sql', $val ];
558   } elsif (ref($val) eq 'CODE') {
559     push @res, $val;
560   } elsif (ref($val) eq 'ARRAY') {
561     push @res, map { [ 'do_sql', $_ ] } @$val;
562   } else {
563     $self->throw_exception("Invalid type for $type: ".ref($val));
564   }
565
566   return \@res;
567 }
568
569 =head2 dbh_do
570
571 Arguments: ($subref | $method_name), @extra_coderef_args?
572
573 Execute the given $subref or $method_name using the new exception-based
574 connection management.
575
576 The first two arguments will be the storage object that C<dbh_do> was called
577 on and a database handle to use.  Any additional arguments will be passed
578 verbatim to the called subref as arguments 2 and onwards.
579
580 Using this (instead of $self->_dbh or $self->dbh) ensures correct
581 exception handling and reconnection (or failover in future subclasses).
582
583 Your subref should have no side-effects outside of the database, as
584 there is the potential for your subref to be partially double-executed
585 if the database connection was stale/dysfunctional.
586
587 Example:
588
589   my @stuff = $schema->storage->dbh_do(
590     sub {
591       my ($storage, $dbh, @cols) = @_;
592       my $cols = join(q{, }, @cols);
593       $dbh->selectrow_array("SELECT $cols FROM foo");
594     },
595     @column_list
596   );
597
598 =cut
599
600 sub dbh_do {
601   my $self = shift;
602   my $code = shift;
603
604   my $dbh = $self->_get_dbh;
605
606   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
607       || $self->{transaction_depth};
608
609   local $self->{_in_dbh_do} = 1;
610
611   my @result;
612   my $want_array = wantarray;
613
614   eval {
615
616     if($want_array) {
617         @result = $self->$code($dbh, @_);
618     }
619     elsif(defined $want_array) {
620         $result[0] = $self->$code($dbh, @_);
621     }
622     else {
623         $self->$code($dbh, @_);
624     }
625   };
626
627   # ->connected might unset $@ - copy
628   my $exception = $@;
629   if(!$exception) { return $want_array ? @result : $result[0] }
630
631   $self->throw_exception($exception) if $self->connected;
632
633   # We were not connected - reconnect and retry, but let any
634   #  exception fall right through this time
635   carp "Retrying $code after catching disconnected exception: $exception"
636     if $ENV{DBIC_DBIRETRY_DEBUG};
637   $self->_populate_dbh;
638   $self->$code($self->_dbh, @_);
639 }
640
641 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
642 # It also informs dbh_do to bypass itself while under the direction of txn_do,
643 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
644 sub txn_do {
645   my $self = shift;
646   my $coderef = shift;
647
648   ref $coderef eq 'CODE' or $self->throw_exception
649     ('$coderef must be a CODE reference');
650
651   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
652
653   local $self->{_in_dbh_do} = 1;
654
655   my @result;
656   my $want_array = wantarray;
657
658   my $tried = 0;
659   while(1) {
660     eval {
661       $self->_get_dbh;
662
663       $self->txn_begin;
664       if($want_array) {
665           @result = $coderef->(@_);
666       }
667       elsif(defined $want_array) {
668           $result[0] = $coderef->(@_);
669       }
670       else {
671           $coderef->(@_);
672       }
673       $self->txn_commit;
674     };
675
676     # ->connected might unset $@ - copy
677     my $exception = $@;
678     if(!$exception) { return $want_array ? @result : $result[0] }
679
680     if($tried++ || $self->connected) {
681       eval { $self->txn_rollback };
682       my $rollback_exception = $@;
683       if($rollback_exception) {
684         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
685         $self->throw_exception($exception)  # propagate nested rollback
686           if $rollback_exception =~ /$exception_class/;
687
688         $self->throw_exception(
689           "Transaction aborted: ${exception}. "
690           . "Rollback failed: ${rollback_exception}"
691         );
692       }
693       $self->throw_exception($exception)
694     }
695
696     # We were not connected, and was first try - reconnect and retry
697     # via the while loop
698     carp "Retrying $coderef after catching disconnected exception: $exception"
699       if $ENV{DBIC_DBIRETRY_DEBUG};
700     $self->_populate_dbh;
701   }
702 }
703
704 =head2 disconnect
705
706 Our C<disconnect> method also performs a rollback first if the
707 database is not in C<AutoCommit> mode.
708
709 =cut
710
711 sub disconnect {
712   my ($self) = @_;
713
714   if( $self->_dbh ) {
715     my @actions;
716
717     push @actions, ( $self->on_disconnect_call || () );
718     push @actions, $self->_parse_connect_do ('on_disconnect_do');
719
720     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
721
722     $self->_dbh_rollback unless $self->_dbh_autocommit;
723
724     $self->_dbh->disconnect;
725     $self->_dbh(undef);
726     $self->{_dbh_gen}++;
727   }
728 }
729
730 =head2 with_deferred_fk_checks
731
732 =over 4
733
734 =item Arguments: C<$coderef>
735
736 =item Return Value: The return value of $coderef
737
738 =back
739
740 Storage specific method to run the code ref with FK checks deferred or
741 in MySQL's case disabled entirely.
742
743 =cut
744
745 # Storage subclasses should override this
746 sub with_deferred_fk_checks {
747   my ($self, $sub) = @_;
748   $sub->();
749 }
750
751 =head2 connected
752
753 =over
754
755 =item Arguments: none
756
757 =item Return Value: 1|0
758
759 =back
760
761 Verifies that the the current database handle is active and ready to execute
762 an SQL statement (i.e. the connection did not get stale, server is still
763 answering, etc.) This method is used internally by L</dbh>.
764
765 =cut
766
767 sub connected {
768   my $self = shift;
769   return 0 unless $self->_seems_connected;
770
771   #be on the safe side
772   local $self->_dbh->{RaiseError} = 1;
773
774   return $self->_ping;
775 }
776
777 sub _seems_connected {
778   my $self = shift;
779
780   my $dbh = $self->_dbh
781     or return 0;
782
783   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
784     $self->_dbh(undef);
785     $self->{_dbh_gen}++;
786     return 0;
787   }
788   else {
789     $self->_verify_pid;
790     return 0 if !$self->_dbh;
791   }
792
793   return $dbh->FETCH('Active');
794 }
795
796 sub _ping {
797   my $self = shift;
798
799   my $dbh = $self->_dbh or return 0;
800
801   return $dbh->ping;
802 }
803
804 # handle pid changes correctly
805 #  NOTE: assumes $self->_dbh is a valid $dbh
806 sub _verify_pid {
807   my ($self) = @_;
808
809   return if defined $self->_conn_pid && $self->_conn_pid == $$;
810
811   $self->_dbh->{InactiveDestroy} = 1;
812   $self->_dbh(undef);
813   $self->{_dbh_gen}++;
814
815   return;
816 }
817
818 sub ensure_connected {
819   my ($self) = @_;
820
821   unless ($self->connected) {
822     $self->_populate_dbh;
823   }
824 }
825
826 =head2 dbh
827
828 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
829 is guaranteed to be healthy by implicitly calling L</connected>, and if
830 necessary performing a reconnection before returning. Keep in mind that this
831 is very B<expensive> on some database engines. Consider using L<dbh_do>
832 instead.
833
834 =cut
835
836 sub dbh {
837   my ($self) = @_;
838
839   if (not $self->_dbh) {
840     $self->_populate_dbh;
841   } else {
842     $self->ensure_connected;
843   }
844   return $self->_dbh;
845 }
846
847 # this is the internal "get dbh or connect (don't check)" method
848 sub _get_dbh {
849   my $self = shift;
850   $self->_verify_pid if $self->_dbh;
851   $self->_populate_dbh unless $self->_dbh;
852   return $self->_dbh;
853 }
854
855 sub _sql_maker_args {
856     my ($self) = @_;
857
858     return (
859       bindtype=>'columns',
860       array_datatypes => 1,
861       limit_dialect => $self->_get_dbh,
862       %{$self->_sql_maker_opts}
863     );
864 }
865
866 sub sql_maker {
867   my ($self) = @_;
868   unless ($self->_sql_maker) {
869     my $sql_maker_class = $self->sql_maker_class;
870     $self->ensure_class_loaded ($sql_maker_class);
871     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
872   }
873   return $self->_sql_maker;
874 }
875
876 # nothing to do by default
877 sub _rebless {}
878 sub _init {}
879
880 sub _populate_dbh {
881   my ($self) = @_;
882
883   my @info = @{$self->_dbi_connect_info || []};
884   $self->_dbh(undef); # in case ->connected failed we might get sent here
885   $self->_dbh($self->_connect(@info));
886
887   $self->_conn_pid($$);
888   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
889
890   $self->_determine_driver;
891
892   # Always set the transaction depth on connect, since
893   #  there is no transaction in progress by definition
894   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
895
896   $self->_run_connection_actions unless $self->{_in_determine_driver};
897 }
898
899 sub _run_connection_actions {
900   my $self = shift;
901   my @actions;
902
903   push @actions, ( $self->on_connect_call || () );
904   push @actions, $self->_parse_connect_do ('on_connect_do');
905
906   $self->_do_connection_actions(connect_call_ => $_) for @actions;
907 }
908
909 sub _determine_driver {
910   my ($self) = @_;
911
912   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
913     my $started_connected = 0;
914     local $self->{_in_determine_driver} = 1;
915
916     if (ref($self) eq __PACKAGE__) {
917       my $driver;
918       if ($self->_dbh) { # we are connected
919         $driver = $self->_dbh->{Driver}{Name};
920         $started_connected = 1;
921       } else {
922         # if connect_info is a CODEREF, we have no choice but to connect
923         if (ref $self->_dbi_connect_info->[0] &&
924             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
925           $self->_populate_dbh;
926           $driver = $self->_dbh->{Driver}{Name};
927         }
928         else {
929           # try to use dsn to not require being connected, the driver may still
930           # force a connection in _rebless to determine version
931           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
932         }
933       }
934
935       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
936       if ($self->load_optional_class($storage_class)) {
937         mro::set_mro($storage_class, 'c3');
938         bless $self, $storage_class;
939         $self->_rebless();
940       }
941     }
942
943     $self->_driver_determined(1);
944
945     $self->_init; # run driver-specific initializations
946
947     $self->_run_connection_actions
948         if !$started_connected && defined $self->_dbh;
949   }
950 }
951
952 sub _do_connection_actions {
953   my $self          = shift;
954   my $method_prefix = shift;
955   my $call          = shift;
956
957   if (not ref($call)) {
958     my $method = $method_prefix . $call;
959     $self->$method(@_);
960   } elsif (ref($call) eq 'CODE') {
961     $self->$call(@_);
962   } elsif (ref($call) eq 'ARRAY') {
963     if (ref($call->[0]) ne 'ARRAY') {
964       $self->_do_connection_actions($method_prefix, $_) for @$call;
965     } else {
966       $self->_do_connection_actions($method_prefix, @$_) for @$call;
967     }
968   } else {
969     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
970   }
971
972   return $self;
973 }
974
975 sub connect_call_do_sql {
976   my $self = shift;
977   $self->_do_query(@_);
978 }
979
980 sub disconnect_call_do_sql {
981   my $self = shift;
982   $self->_do_query(@_);
983 }
984
985 # override in db-specific backend when necessary
986 sub connect_call_datetime_setup { 1 }
987
988 sub _do_query {
989   my ($self, $action) = @_;
990
991   if (ref $action eq 'CODE') {
992     $action = $action->($self);
993     $self->_do_query($_) foreach @$action;
994   }
995   else {
996     # Most debuggers expect ($sql, @bind), so we need to exclude
997     # the attribute hash which is the second argument to $dbh->do
998     # furthermore the bind values are usually to be presented
999     # as named arrayref pairs, so wrap those here too
1000     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1001     my $sql = shift @do_args;
1002     my $attrs = shift @do_args;
1003     my @bind = map { [ undef, $_ ] } @do_args;
1004
1005     $self->_query_start($sql, @bind);
1006     $self->_get_dbh->do($sql, $attrs, @do_args);
1007     $self->_query_end($sql, @bind);
1008   }
1009
1010   return $self;
1011 }
1012
1013 sub _connect {
1014   my ($self, @info) = @_;
1015
1016   $self->throw_exception("You failed to provide any connection info")
1017     if !@info;
1018
1019   my ($old_connect_via, $dbh);
1020
1021   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1022     $old_connect_via = $DBI::connect_via;
1023     $DBI::connect_via = 'connect';
1024   }
1025
1026   eval {
1027     if(ref $info[0] eq 'CODE') {
1028        $dbh = &{$info[0]}
1029     }
1030     else {
1031        $dbh = DBI->connect(@info);
1032     }
1033
1034     if($dbh && !$self->unsafe) {
1035       my $weak_self = $self;
1036       Scalar::Util::weaken($weak_self);
1037       $dbh->{HandleError} = sub {
1038           if ($weak_self) {
1039             $weak_self->throw_exception("DBI Exception: $_[0]");
1040           }
1041           else {
1042             # the handler may be invoked by something totally out of
1043             # the scope of DBIC
1044             croak ("DBI Exception: $_[0]");
1045           }
1046       };
1047       $dbh->{ShowErrorStatement} = 1;
1048       $dbh->{RaiseError} = 1;
1049       $dbh->{PrintError} = 0;
1050     }
1051   };
1052
1053   $DBI::connect_via = $old_connect_via if $old_connect_via;
1054
1055   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1056     if !$dbh || $@;
1057
1058   $self->_dbh_autocommit($dbh->{AutoCommit});
1059
1060   $dbh;
1061 }
1062
1063 sub svp_begin {
1064   my ($self, $name) = @_;
1065
1066   $name = $self->_svp_generate_name
1067     unless defined $name;
1068
1069   $self->throw_exception ("You can't use savepoints outside a transaction")
1070     if $self->{transaction_depth} == 0;
1071
1072   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1073     unless $self->can('_svp_begin');
1074
1075   push @{ $self->{savepoints} }, $name;
1076
1077   $self->debugobj->svp_begin($name) if $self->debug;
1078
1079   return $self->_svp_begin($name);
1080 }
1081
1082 sub svp_release {
1083   my ($self, $name) = @_;
1084
1085   $self->throw_exception ("You can't use savepoints outside a transaction")
1086     if $self->{transaction_depth} == 0;
1087
1088   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1089     unless $self->can('_svp_release');
1090
1091   if (defined $name) {
1092     $self->throw_exception ("Savepoint '$name' does not exist")
1093       unless grep { $_ eq $name } @{ $self->{savepoints} };
1094
1095     # Dig through the stack until we find the one we are releasing.  This keeps
1096     # the stack up to date.
1097     my $svp;
1098
1099     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1100   } else {
1101     $name = pop @{ $self->{savepoints} };
1102   }
1103
1104   $self->debugobj->svp_release($name) if $self->debug;
1105
1106   return $self->_svp_release($name);
1107 }
1108
1109 sub svp_rollback {
1110   my ($self, $name) = @_;
1111
1112   $self->throw_exception ("You can't use savepoints outside a transaction")
1113     if $self->{transaction_depth} == 0;
1114
1115   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1116     unless $self->can('_svp_rollback');
1117
1118   if (defined $name) {
1119       # If they passed us a name, verify that it exists in the stack
1120       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1121           $self->throw_exception("Savepoint '$name' does not exist!");
1122       }
1123
1124       # Dig through the stack until we find the one we are releasing.  This keeps
1125       # the stack up to date.
1126       while(my $s = pop(@{ $self->{savepoints} })) {
1127           last if($s eq $name);
1128       }
1129       # Add the savepoint back to the stack, as a rollback doesn't remove the
1130       # named savepoint, only everything after it.
1131       push(@{ $self->{savepoints} }, $name);
1132   } else {
1133       # We'll assume they want to rollback to the last savepoint
1134       $name = $self->{savepoints}->[-1];
1135   }
1136
1137   $self->debugobj->svp_rollback($name) if $self->debug;
1138
1139   return $self->_svp_rollback($name);
1140 }
1141
1142 sub _svp_generate_name {
1143     my ($self) = @_;
1144
1145     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1146 }
1147
1148 sub txn_begin {
1149   my $self = shift;
1150   if($self->{transaction_depth} == 0) {
1151     $self->debugobj->txn_begin()
1152       if $self->debug;
1153     $self->_dbh_begin_work;
1154   }
1155   elsif ($self->auto_savepoint) {
1156     $self->svp_begin;
1157   }
1158   $self->{transaction_depth}++;
1159 }
1160
1161 sub _dbh_begin_work {
1162   my $self = shift;
1163
1164   # if the user is utilizing txn_do - good for him, otherwise we need to
1165   # ensure that the $dbh is healthy on BEGIN.
1166   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1167   # will be replaced by a failure of begin_work itself (which will be
1168   # then retried on reconnect)
1169   if ($self->{_in_dbh_do}) {
1170     $self->_dbh->begin_work;
1171   } else {
1172     $self->dbh_do(sub { $_[1]->begin_work });
1173   }
1174 }
1175
1176 sub txn_commit {
1177   my $self = shift;
1178   if ($self->{transaction_depth} == 1) {
1179     $self->debugobj->txn_commit()
1180       if ($self->debug);
1181     $self->_dbh_commit;
1182     $self->{transaction_depth} = 0
1183       if $self->_dbh_autocommit;
1184   }
1185   elsif($self->{transaction_depth} > 1) {
1186     $self->{transaction_depth}--;
1187     $self->svp_release
1188       if $self->auto_savepoint;
1189   }
1190 }
1191
1192 sub _dbh_commit {
1193   my $self = shift;
1194   my $dbh  = $self->_dbh
1195     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1196   $dbh->commit;
1197 }
1198
1199 sub txn_rollback {
1200   my $self = shift;
1201   my $dbh = $self->_dbh;
1202   eval {
1203     if ($self->{transaction_depth} == 1) {
1204       $self->debugobj->txn_rollback()
1205         if ($self->debug);
1206       $self->{transaction_depth} = 0
1207         if $self->_dbh_autocommit;
1208       $self->_dbh_rollback;
1209     }
1210     elsif($self->{transaction_depth} > 1) {
1211       $self->{transaction_depth}--;
1212       if ($self->auto_savepoint) {
1213         $self->svp_rollback;
1214         $self->svp_release;
1215       }
1216     }
1217     else {
1218       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1219     }
1220   };
1221   if ($@) {
1222     my $error = $@;
1223     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1224     $error =~ /$exception_class/ and $self->throw_exception($error);
1225     # ensure that a failed rollback resets the transaction depth
1226     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1227     $self->throw_exception($error);
1228   }
1229 }
1230
1231 sub _dbh_rollback {
1232   my $self = shift;
1233   my $dbh  = $self->_dbh
1234     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1235   $dbh->rollback;
1236 }
1237
1238 # This used to be the top-half of _execute.  It was split out to make it
1239 #  easier to override in NoBindVars without duping the rest.  It takes up
1240 #  all of _execute's args, and emits $sql, @bind.
1241 sub _prep_for_execute {
1242   my ($self, $op, $extra_bind, $ident, $args) = @_;
1243
1244   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1245     $ident = $ident->from();
1246   }
1247
1248   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1249
1250   unshift(@bind,
1251     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1252       if $extra_bind;
1253   return ($sql, \@bind);
1254 }
1255
1256
1257 sub _fix_bind_params {
1258     my ($self, @bind) = @_;
1259
1260     ### Turn @bind from something like this:
1261     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1262     ### to this:
1263     ###   ( "'1'", "'1'", "'3'" )
1264     return
1265         map {
1266             if ( defined( $_ && $_->[1] ) ) {
1267                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1268             }
1269             else { q{'NULL'}; }
1270         } @bind;
1271 }
1272
1273 sub _query_start {
1274     my ( $self, $sql, @bind ) = @_;
1275
1276     if ( $self->debug ) {
1277         @bind = $self->_fix_bind_params(@bind);
1278
1279         $self->debugobj->query_start( $sql, @bind );
1280     }
1281 }
1282
1283 sub _query_end {
1284     my ( $self, $sql, @bind ) = @_;
1285
1286     if ( $self->debug ) {
1287         @bind = $self->_fix_bind_params(@bind);
1288         $self->debugobj->query_end( $sql, @bind );
1289     }
1290 }
1291
1292 sub _dbh_execute {
1293   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1294
1295   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1296
1297   $self->_query_start( $sql, @$bind );
1298
1299   my $sth = $self->sth($sql,$op);
1300
1301   my $placeholder_index = 1;
1302
1303   foreach my $bound (@$bind) {
1304     my $attributes = {};
1305     my($column_name, @data) = @$bound;
1306
1307     if ($bind_attributes) {
1308       $attributes = $bind_attributes->{$column_name}
1309       if defined $bind_attributes->{$column_name};
1310     }
1311
1312     foreach my $data (@data) {
1313       my $ref = ref $data;
1314       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1315
1316       $sth->bind_param($placeholder_index, $data, $attributes);
1317       $placeholder_index++;
1318     }
1319   }
1320
1321   # Can this fail without throwing an exception anyways???
1322   my $rv = $sth->execute();
1323   $self->throw_exception($sth->errstr) if !$rv;
1324
1325   $self->_query_end( $sql, @$bind );
1326
1327   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1328 }
1329
1330 sub _execute {
1331     my $self = shift;
1332     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1333 }
1334
1335 sub insert {
1336   my ($self, $source, $to_insert) = @_;
1337
1338   my $ident = $source->from;
1339   my $bind_attributes = $self->source_bind_attributes($source);
1340
1341   my $updated_cols = {};
1342
1343   foreach my $col ( $source->columns ) {
1344     if ( !defined $to_insert->{$col} ) {
1345       my $col_info = $source->column_info($col);
1346
1347       if ( $col_info->{auto_nextval} ) {
1348         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1349           'nextval',
1350           $col_info->{sequence} ||
1351             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1352         );
1353       }
1354     }
1355   }
1356
1357   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1358
1359   return $updated_cols;
1360 }
1361
1362 ## Still not quite perfect, and EXPERIMENTAL
1363 ## Currently it is assumed that all values passed will be "normal", i.e. not
1364 ## scalar refs, or at least, all the same type as the first set, the statement is
1365 ## only prepped once.
1366 sub insert_bulk {
1367   my ($self, $source, $cols, $data) = @_;
1368
1369   my %colvalues;
1370   @colvalues{@$cols} = (0..$#$cols);
1371
1372   for my $i (0..$#$cols) {
1373     my $first_val = $data->[0][$i];
1374     next unless ref $first_val eq 'SCALAR';
1375
1376     $colvalues{ $cols->[$i] } = $first_val;
1377   }
1378
1379   # check for bad data and stringify stringifiable objects
1380   my $bad_slice = sub {
1381     my ($msg, $col_idx, $slice_idx) = @_;
1382     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1383       $msg,
1384       $cols->[$col_idx],
1385       do {
1386         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1387         Data::Dumper::Concise::Dumper({
1388           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1389         }),
1390       }
1391     );
1392   };
1393
1394   for my $datum_idx (0..$#$data) {
1395     my $datum = $data->[$datum_idx];
1396
1397     for my $col_idx (0..$#$cols) {
1398       my $val            = $datum->[$col_idx];
1399       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1400       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1401
1402       if ($is_literal_sql) {
1403         if (not ref $val) {
1404           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1405         }
1406         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1407           $bad_slice->("$reftype reference found where literal SQL expected",
1408             $col_idx, $datum_idx);
1409         }
1410         elsif ($$val ne $$sqla_bind){
1411           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1412             $col_idx, $datum_idx);
1413         }
1414       }
1415       elsif (my $reftype = ref $val) {
1416         require overload;
1417         if (overload::Method($val, '""')) {
1418           $datum->[$col_idx] = "".$val;
1419         }
1420         else {
1421           $bad_slice->("$reftype reference found where bind expected",
1422             $col_idx, $datum_idx);
1423         }
1424       }
1425     }
1426   }
1427
1428   my ($sql, $bind) = $self->_prep_for_execute (
1429     'insert', undef, $source, [\%colvalues]
1430   );
1431   my @bind = @$bind;
1432
1433   my $empty_bind = 1 if (not @bind) &&
1434     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1435
1436   if ((not @bind) && (not $empty_bind)) {
1437     $self->throw_exception(
1438       'Cannot insert_bulk without support for placeholders'
1439     );
1440   }
1441
1442   $self->_query_start( $sql, ['__BULK__'] );
1443   my $sth = $self->sth($sql);
1444
1445   my $rv = do {
1446     if ($empty_bind) {
1447       # bind_param_array doesn't work if there are no binds
1448       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1449     }
1450     else {
1451 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1452       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1453     }
1454   };
1455
1456   $self->_query_end( $sql, ['__BULK__'] );
1457
1458   return (wantarray ? ($rv, $sth, @bind) : $rv);
1459 }
1460
1461 sub _execute_array {
1462   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1463
1464   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1465
1466   ## This must be an arrayref, else nothing works!
1467   my $tuple_status = [];
1468
1469   ## Get the bind_attributes, if any exist
1470   my $bind_attributes = $self->source_bind_attributes($source);
1471
1472   ## Bind the values and execute
1473   my $placeholder_index = 1;
1474
1475   foreach my $bound (@$bind) {
1476
1477     my $attributes = {};
1478     my ($column_name, $data_index) = @$bound;
1479
1480     if( $bind_attributes ) {
1481       $attributes = $bind_attributes->{$column_name}
1482       if defined $bind_attributes->{$column_name};
1483     }
1484
1485     my @data = map { $_->[$data_index] } @$data;
1486
1487     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1488     $placeholder_index++;
1489   }
1490
1491   my $rv = eval {
1492     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1493   };
1494   my $err = $@ || $sth->errstr;
1495
1496 # Statement must finish even if there was an exception.
1497   eval { $sth->finish };
1498   $err = $@ unless $err;
1499
1500   if ($err) {
1501     my $i = 0;
1502     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1503
1504     $self->throw_exception("Unexpected populate error: $err")
1505       if ($i > $#$tuple_status);
1506
1507     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1508       ($tuple_status->[$i][1] || $err),
1509       Data::Dumper::Concise::Dumper({
1510         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1511       }),
1512     );
1513   }
1514
1515   $guard->commit if $guard;
1516
1517   return $rv;
1518 }
1519
1520 sub _dbh_execute_array {
1521     my ($self, $sth, $tuple_status, @extra) = @_;
1522
1523     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1524 }
1525
1526 sub _dbh_execute_inserts_with_no_binds {
1527   my ($self, $sth, $count) = @_;
1528
1529   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1530
1531   eval {
1532     my $dbh = $self->_get_dbh;
1533     local $dbh->{RaiseError} = 1;
1534     local $dbh->{PrintError} = 0;
1535
1536     $sth->execute foreach 1..$count;
1537   };
1538   my $exception = $@;
1539
1540 # Make sure statement is finished even if there was an exception.
1541   eval { $sth->finish };
1542   $exception = $@ unless $exception;
1543
1544   $self->throw_exception($exception) if $exception;
1545
1546   $guard->commit if $guard;
1547
1548   return $count;
1549 }
1550
1551 sub update {
1552   my ($self, $source, @args) = @_; 
1553
1554   my $bind_attributes = $self->source_bind_attributes($source);
1555
1556   return $self->_execute('update' => [], $source, $bind_attributes, @args);
1557 }
1558
1559
1560 sub delete {
1561   my ($self, $source, @args) = @_; 
1562
1563   my $bind_attrs = $self->source_bind_attributes($source);
1564
1565   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1566 }
1567
1568 # We were sent here because the $rs contains a complex search
1569 # which will require a subquery to select the correct rows
1570 # (i.e. joined or limited resultsets)
1571 #
1572 # Genarating a single PK column subquery is trivial and supported
1573 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1574 # Look at _multipk_update_delete()
1575 sub _subq_update_delete {
1576   my $self = shift;
1577   my ($rs, $op, $values) = @_;
1578
1579   my $rsrc = $rs->result_source;
1580
1581   # we already check this, but double check naively just in case. Should be removed soon
1582   my $sel = $rs->_resolved_attrs->{select};
1583   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1584   my @pcols = $rsrc->primary_columns;
1585   if (@$sel != @pcols) {
1586     $self->throw_exception (
1587       'Subquery update/delete can not be called on resultsets selecting a'
1588      .' number of columns different than the number of primary keys'
1589     );
1590   }
1591
1592   if (@pcols == 1) {
1593     return $self->$op (
1594       $rsrc,
1595       $op eq 'update' ? $values : (),
1596       { $pcols[0] => { -in => $rs->as_query } },
1597     );
1598   }
1599
1600   else {
1601     return $self->_multipk_update_delete (@_);
1602   }
1603 }
1604
1605 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1606 # resultset update/delete involving subqueries. So by default resort
1607 # to simple (and inefficient) delete_all style per-row opearations,
1608 # while allowing specific storages to override this with a faster
1609 # implementation.
1610 #
1611 sub _multipk_update_delete {
1612   return shift->_per_row_update_delete (@_);
1613 }
1614
1615 # This is the default loop used to delete/update rows for multi PK
1616 # resultsets, and used by mysql exclusively (because it can't do anything
1617 # else).
1618 #
1619 # We do not use $row->$op style queries, because resultset update/delete
1620 # is not expected to cascade (this is what delete_all/update_all is for).
1621 #
1622 # There should be no race conditions as the entire operation is rolled
1623 # in a transaction.
1624 #
1625 sub _per_row_update_delete {
1626   my $self = shift;
1627   my ($rs, $op, $values) = @_;
1628
1629   my $rsrc = $rs->result_source;
1630   my @pcols = $rsrc->primary_columns;
1631
1632   my $guard = $self->txn_scope_guard;
1633
1634   # emulate the return value of $sth->execute for non-selects
1635   my $row_cnt = '0E0';
1636
1637   my $subrs_cur = $rs->cursor;
1638   while (my @pks = $subrs_cur->next) {
1639
1640     my $cond;
1641     for my $i (0.. $#pcols) {
1642       $cond->{$pcols[$i]} = $pks[$i];
1643     }
1644
1645     $self->$op (
1646       $rsrc,
1647       $op eq 'update' ? $values : (),
1648       $cond,
1649     );
1650
1651     $row_cnt++;
1652   }
1653
1654   $guard->commit;
1655
1656   return $row_cnt;
1657 }
1658
1659 sub _select {
1660   my $self = shift;
1661
1662   # localization is neccessary as
1663   # 1) there is no infrastructure to pass this around before SQLA2
1664   # 2) _select_args sets it and _prep_for_execute consumes it
1665   my $sql_maker = $self->sql_maker;
1666   local $sql_maker->{_dbic_rs_attrs};
1667
1668   return $self->_execute($self->_select_args(@_));
1669 }
1670
1671 sub _select_args_to_query {
1672   my $self = shift;
1673
1674   # localization is neccessary as
1675   # 1) there is no infrastructure to pass this around before SQLA2
1676   # 2) _select_args sets it and _prep_for_execute consumes it
1677   my $sql_maker = $self->sql_maker;
1678   local $sql_maker->{_dbic_rs_attrs};
1679
1680   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1681   #  = $self->_select_args($ident, $select, $cond, $attrs);
1682   my ($op, $bind, $ident, $bind_attrs, @args) =
1683     $self->_select_args(@_);
1684
1685   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1686   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1687   $prepared_bind ||= [];
1688
1689   return wantarray
1690     ? ($sql, $prepared_bind, $bind_attrs)
1691     : \[ "($sql)", @$prepared_bind ]
1692   ;
1693 }
1694
1695 sub _select_args {
1696   my ($self, $ident, $select, $where, $attrs) = @_;
1697
1698   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1699
1700   my $sql_maker = $self->sql_maker;
1701   $sql_maker->{_dbic_rs_attrs} = {
1702     %$attrs,
1703     select => $select,
1704     from => $ident,
1705     where => $where,
1706     $rs_alias
1707       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1708       : ()
1709     ,
1710   };
1711
1712   # calculate bind_attrs before possible $ident mangling
1713   my $bind_attrs = {};
1714   for my $alias (keys %$alias2source) {
1715     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1716     for my $col (keys %$bindtypes) {
1717
1718       my $fqcn = join ('.', $alias, $col);
1719       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1720
1721       # Unqialified column names are nice, but at the same time can be
1722       # rather ambiguous. What we do here is basically go along with
1723       # the loop, adding an unqualified column slot to $bind_attrs,
1724       # alongside the fully qualified name. As soon as we encounter
1725       # another column by that name (which would imply another table)
1726       # we unset the unqualified slot and never add any info to it
1727       # to avoid erroneous type binding. If this happens the users
1728       # only choice will be to fully qualify his column name
1729
1730       if (exists $bind_attrs->{$col}) {
1731         $bind_attrs->{$col} = {};
1732       }
1733       else {
1734         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1735       }
1736     }
1737   }
1738
1739   # adjust limits
1740   if (
1741     $attrs->{software_limit}
1742       ||
1743     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1744   ) {
1745     $attrs->{software_limit} = 1;
1746   }
1747   else {
1748     $self->throw_exception("rows attribute must be positive if present")
1749       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1750
1751     # MySQL actually recommends this approach.  I cringe.
1752     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1753   }
1754
1755   my @limit;
1756
1757   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1758   # otherwise delegate the limiting to the storage, unless software limit was requested
1759   if (
1760     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1761        ||
1762     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1763       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1764   ) {
1765     ($ident, $select, $where, $attrs)
1766       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1767   }
1768   elsif (! $attrs->{software_limit} ) {
1769     push @limit, $attrs->{rows}, $attrs->{offset};
1770   }
1771
1772 ###
1773   # This would be the point to deflate anything found in $where
1774   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1775   # expect a row object. And all we have is a resultsource (it is trivial
1776   # to extract deflator coderefs via $alias2source above).
1777   #
1778   # I don't see a way forward other than changing the way deflators are
1779   # invoked, and that's just bad...
1780 ###
1781
1782   my $order = { map
1783     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1784     (qw/order_by group_by having/ )
1785   };
1786
1787   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1788 }
1789
1790 #
1791 # This is the code producing joined subqueries like:
1792 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1793 #
1794 sub _adjust_select_args_for_complex_prefetch {
1795   my ($self, $from, $select, $where, $attrs) = @_;
1796
1797   $self->throw_exception ('Nothing to prefetch... how did we get here?!')
1798     if not @{$attrs->{_prefetch_select}};
1799
1800   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1801     if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
1802
1803
1804   # generate inner/outer attribute lists, remove stuff that doesn't apply
1805   my $outer_attrs = { %$attrs };
1806   delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
1807
1808   my $inner_attrs = { %$attrs };
1809   delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
1810
1811
1812   # bring over all non-collapse-induced order_by into the inner query (if any)
1813   # the outer one will have to keep them all
1814   delete $inner_attrs->{order_by};
1815   if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
1816     $inner_attrs->{order_by} = [
1817       @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1818     ];
1819   }
1820
1821
1822   # generate the inner/outer select lists
1823   # for inside we consider only stuff *not* brought in by the prefetch
1824   # on the outside we substitute any function for its alias
1825   my $outer_select = [ @$select ];
1826   my $inner_select = [];
1827   for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
1828     my $sel = $outer_select->[$i];
1829
1830     if (ref $sel eq 'HASH' ) {
1831       $sel->{-as} ||= $attrs->{as}[$i];
1832       $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
1833     }
1834
1835     push @$inner_select, $sel;
1836   }
1837
1838   # normalize a copy of $from, so it will be easier to work with further
1839   # down (i.e. promote the initial hashref to an AoH)
1840   $from = [ @$from ];
1841   $from->[0] = [ $from->[0] ];
1842   my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1843
1844
1845   # decide which parts of the join will remain in either part of
1846   # the outer/inner query
1847
1848   # First we compose a list of which aliases are used in restrictions
1849   # (i.e. conditions/order/grouping/etc). Since we do not have
1850   # introspectable SQLA, we fall back to ugly scanning of raw SQL for
1851   # WHERE, and for pieces of ORDER BY in order to determine which aliases
1852   # need to appear in the resulting sql.
1853   # It may not be very efficient, but it's a reasonable stop-gap
1854   # Also unqualified column names will not be considered, but more often
1855   # than not this is actually ok
1856   #
1857   # In the same loop we enumerate part of the selection aliases, as
1858   # it requires the same sqla hack for the time being
1859   my ($restrict_aliases, $select_aliases, $prefetch_aliases);
1860   {
1861     # produce stuff unquoted, so it can be scanned
1862     my $sql_maker = $self->sql_maker;
1863     local $sql_maker->{quote_char};
1864     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1865     $sep = "\Q$sep\E";
1866
1867     my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
1868     my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
1869     my $where_sql = $sql_maker->where ($where);
1870     my $group_by_sql = $sql_maker->_order_by({
1871       map { $_ => $inner_attrs->{$_} } qw/group_by having/
1872     });
1873     my @non_prefetch_order_by_chunks = (map
1874       { ref $_ ? $_->[0] : $_ }
1875       $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
1876     );
1877
1878
1879     for my $alias (keys %original_join_info) {
1880       my $seen_re = qr/\b $alias $sep/x;
1881
1882       for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
1883         if ($piece =~ $seen_re) {
1884           $restrict_aliases->{$alias} = 1;
1885         }
1886       }
1887
1888       if ($non_prefetch_select_sql =~ $seen_re) {
1889           $select_aliases->{$alias} = 1;
1890       }
1891
1892       if ($prefetch_select_sql =~ $seen_re) {
1893           $prefetch_aliases->{$alias} = 1;
1894       }
1895
1896     }
1897   }
1898
1899   # Add any non-left joins to the restriction list (such joins are indeed restrictions)
1900   for my $j (values %original_join_info) {
1901     my $alias = $j->{-alias} or next;
1902     $restrict_aliases->{$alias} = 1 if (
1903       (not $j->{-join_type})
1904         or
1905       ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
1906     );
1907   }
1908
1909   # mark all join parents as mentioned
1910   # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
1911   for my $collection ($restrict_aliases, $select_aliases) {
1912     for my $alias (keys %$collection) {
1913       $collection->{$_} = 1
1914         for (@{ $original_join_info{$alias}{-join_path} || [] });
1915     }
1916   }
1917
1918   # construct the inner $from for the subquery
1919   my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
1920   my @inner_from;
1921   for my $j (@$from) {
1922     push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
1923   }
1924
1925   # if a multi-type join was needed in the subquery ("multi" is indicated by
1926   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1927   unless ($inner_attrs->{group_by}) {
1928     for my $alias (keys %inner_joins) {
1929
1930       # the dot comes from some weirdness in collapse
1931       # remove after the rewrite
1932       if ($attrs->{collapse}{".$alias"}) {
1933         $inner_attrs->{group_by} ||= $inner_select;
1934         last;
1935       }
1936     }
1937   }
1938
1939   # demote the inner_from head
1940   $inner_from[0] = $inner_from[0][0];
1941
1942   # generate the subquery
1943   my $subq = $self->_select_args_to_query (
1944     \@inner_from,
1945     $inner_select,
1946     $where,
1947     $inner_attrs,
1948   );
1949
1950   my $subq_joinspec = {
1951     -alias => $attrs->{alias},
1952     -source_handle => $inner_from[0]{-source_handle},
1953     $attrs->{alias} => $subq,
1954   };
1955
1956   # Generate the outer from - this is relatively easy (really just replace
1957   # the join slot with the subquery), with a major caveat - we can not
1958   # join anything that is non-selecting (not part of the prefetch), but at
1959   # the same time is a multi-type relationship, as it will explode the result.
1960   #
1961   # There are two possibilities here
1962   # - either the join is non-restricting, in which case we simply throw it away
1963   # - it is part of the restrictions, in which case we need to collapse the outer
1964   #   result by tackling yet another group_by to the outside of the query
1965
1966   # so first generate the outer_from, up to the substitution point
1967   my @outer_from;
1968   while (my $j = shift @$from) {
1969     if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
1970       push @outer_from, [
1971         $subq_joinspec,
1972         @{$j}[1 .. $#$j],
1973       ];
1974       last; # we'll take care of what's left in $from below
1975     }
1976     else {
1977       push @outer_from, $j;
1978     }
1979   }
1980
1981   # see what's left - throw away if not selecting/restricting
1982   # also throw in a group_by if restricting to guard against
1983   # cross-join explosions
1984   #
1985   while (my $j = shift @$from) {
1986     my $alias = $j->[0]{-alias};
1987
1988     if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
1989       push @outer_from, $j;
1990     }
1991     elsif ($restrict_aliases->{$alias}) {
1992       push @outer_from, $j;
1993
1994       # FIXME - this should be obviated by SQLA2, as I'll be able to 
1995       # have restrict_inner and restrict_outer... or something to that
1996       # effect... I think...
1997
1998       # FIXME2 - I can't find a clean way to determine if a particular join
1999       # is a multi - instead I am just treating everything as a potential
2000       # explosive join (ribasushi)
2001       #
2002       # if (my $handle = $j->[0]{-source_handle}) {
2003       #   my $rsrc = $handle->resolve;
2004       #   ... need to bail out of the following if this is not a multi,
2005       #       as it will be much easier on the db ...
2006
2007           $outer_attrs->{group_by} ||= $outer_select;
2008       # }
2009     }
2010   }
2011
2012   # demote the outer_from head
2013   $outer_from[0] = $outer_from[0][0];
2014
2015   # This is totally horrific - the $where ends up in both the inner and outer query
2016   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
2017   # then if where conditions apply to the *right* side of the prefetch, you may have
2018   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
2019   # the outer select to exclude joins you didin't want in the first place
2020   #
2021   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
2022   return (\@outer_from, $outer_select, $where, $outer_attrs);
2023 }
2024
2025 sub _resolve_ident_sources {
2026   my ($self, $ident) = @_;
2027
2028   my $alias2source = {};
2029   my $rs_alias;
2030
2031   # the reason this is so contrived is that $ident may be a {from}
2032   # structure, specifying multiple tables to join
2033   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
2034     # this is compat mode for insert/update/delete which do not deal with aliases
2035     $alias2source->{me} = $ident;
2036     $rs_alias = 'me';
2037   }
2038   elsif (ref $ident eq 'ARRAY') {
2039
2040     for (@$ident) {
2041       my $tabinfo;
2042       if (ref $_ eq 'HASH') {
2043         $tabinfo = $_;
2044         $rs_alias = $tabinfo->{-alias};
2045       }
2046       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
2047         $tabinfo = $_->[0];
2048       }
2049
2050       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
2051         if ($tabinfo->{-source_handle});
2052     }
2053   }
2054
2055   return ($alias2source, $rs_alias);
2056 }
2057
2058 # Takes $ident, \@column_names
2059 #
2060 # returns { $column_name => \%column_info, ... }
2061 # also note: this adds -result_source => $rsrc to the column info
2062 #
2063 # usage:
2064 #   my $col_sources = $self->_resolve_column_info($ident, @column_names);
2065 sub _resolve_column_info {
2066   my ($self, $ident, $colnames) = @_;
2067   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
2068
2069   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
2070   $sep = "\Q$sep\E";
2071
2072   my (%return, %seen_cols);
2073
2074   # compile a global list of column names, to be able to properly
2075   # disambiguate unqualified column names (if at all possible)
2076   for my $alias (keys %$alias2src) {
2077     my $rsrc = $alias2src->{$alias};
2078     for my $colname ($rsrc->columns) {
2079       push @{$seen_cols{$colname}}, $alias;
2080     }
2081   }
2082
2083   COLUMN:
2084   foreach my $col (@$colnames) {
2085     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
2086
2087     unless ($alias) {
2088       # see if the column was seen exactly once (so we know which rsrc it came from)
2089       if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
2090         $alias = $seen_cols{$colname}[0];
2091       }
2092       else {
2093         next COLUMN;
2094       }
2095     }
2096
2097     my $rsrc = $alias2src->{$alias};
2098     $return{$col} = $rsrc && {
2099       %{$rsrc->column_info($colname)},
2100       -result_source => $rsrc,
2101       -source_alias => $alias,
2102     };
2103   }
2104
2105   return \%return;
2106 }
2107
2108 # Returns a counting SELECT for a simple count
2109 # query. Abstracted so that a storage could override
2110 # this to { count => 'firstcol' } or whatever makes
2111 # sense as a performance optimization
2112 sub _count_select {
2113   #my ($self, $source, $rs_attrs) = @_;
2114   return { count => '*' };
2115 }
2116
2117 # Returns a SELECT which will end up in the subselect
2118 # There may or may not be a group_by, as the subquery
2119 # might have been called to accomodate a limit
2120 #
2121 # Most databases would be happy with whatever ends up
2122 # here, but some choke in various ways.
2123 #
2124 sub _subq_count_select {
2125   my ($self, $source, $rs_attrs) = @_;
2126   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
2127
2128   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
2129   return @pcols ? \@pcols : [ 1 ];
2130 }
2131
2132 sub source_bind_attributes {
2133   my ($self, $source) = @_;
2134
2135   my $bind_attributes;
2136   foreach my $column ($source->columns) {
2137
2138     my $data_type = $source->column_info($column)->{data_type} || '';
2139     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2140      if $data_type;
2141   }
2142
2143   return $bind_attributes;
2144 }
2145
2146 =head2 select
2147
2148 =over 4
2149
2150 =item Arguments: $ident, $select, $condition, $attrs
2151
2152 =back
2153
2154 Handle a SQL select statement.
2155
2156 =cut
2157
2158 sub select {
2159   my $self = shift;
2160   my ($ident, $select, $condition, $attrs) = @_;
2161   return $self->cursor_class->new($self, \@_, $attrs);
2162 }
2163
2164 sub select_single {
2165   my $self = shift;
2166   my ($rv, $sth, @bind) = $self->_select(@_);
2167   my @row = $sth->fetchrow_array;
2168   my @nextrow = $sth->fetchrow_array if @row;
2169   if(@row && @nextrow) {
2170     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2171   }
2172   # Need to call finish() to work round broken DBDs
2173   $sth->finish();
2174   return @row;
2175 }
2176
2177 =head2 sth
2178
2179 =over 4
2180
2181 =item Arguments: $sql
2182
2183 =back
2184
2185 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2186
2187 =cut
2188
2189 sub _dbh_sth {
2190   my ($self, $dbh, $sql) = @_;
2191
2192   # 3 is the if_active parameter which avoids active sth re-use
2193   my $sth = $self->disable_sth_caching
2194     ? $dbh->prepare($sql)
2195     : $dbh->prepare_cached($sql, {}, 3);
2196
2197   # XXX You would think RaiseError would make this impossible,
2198   #  but apparently that's not true :(
2199   $self->throw_exception($dbh->errstr) if !$sth;
2200
2201   $sth;
2202 }
2203
2204 sub sth {
2205   my ($self, $sql) = @_;
2206   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2207 }
2208
2209 sub _dbh_columns_info_for {
2210   my ($self, $dbh, $table) = @_;
2211
2212   if ($dbh->can('column_info')) {
2213     my %result;
2214     eval {
2215       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2216       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2217       $sth->execute();
2218       while ( my $info = $sth->fetchrow_hashref() ){
2219         my %column_info;
2220         $column_info{data_type}   = $info->{TYPE_NAME};
2221         $column_info{size}      = $info->{COLUMN_SIZE};
2222         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2223         $column_info{default_value} = $info->{COLUMN_DEF};
2224         my $col_name = $info->{COLUMN_NAME};
2225         $col_name =~ s/^\"(.*)\"$/$1/;
2226
2227         $result{$col_name} = \%column_info;
2228       }
2229     };
2230     return \%result if !$@ && scalar keys %result;
2231   }
2232
2233   my %result;
2234   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2235   $sth->execute;
2236   my @columns = @{$sth->{NAME_lc}};
2237   for my $i ( 0 .. $#columns ){
2238     my %column_info;
2239     $column_info{data_type} = $sth->{TYPE}->[$i];
2240     $column_info{size} = $sth->{PRECISION}->[$i];
2241     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2242
2243     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2244       $column_info{data_type} = $1;
2245       $column_info{size}    = $2;
2246     }
2247
2248     $result{$columns[$i]} = \%column_info;
2249   }
2250   $sth->finish;
2251
2252   foreach my $col (keys %result) {
2253     my $colinfo = $result{$col};
2254     my $type_num = $colinfo->{data_type};
2255     my $type_name;
2256     if(defined $type_num && $dbh->can('type_info')) {
2257       my $type_info = $dbh->type_info($type_num);
2258       $type_name = $type_info->{TYPE_NAME} if $type_info;
2259       $colinfo->{data_type} = $type_name if $type_name;
2260     }
2261   }
2262
2263   return \%result;
2264 }
2265
2266 sub columns_info_for {
2267   my ($self, $table) = @_;
2268   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2269 }
2270
2271 =head2 last_insert_id
2272
2273 Return the row id of the last insert.
2274
2275 =cut
2276
2277 sub _dbh_last_insert_id {
2278     # All Storage's need to register their own _dbh_last_insert_id
2279     # the old SQLite-based method was highly inappropriate
2280
2281     my $self = shift;
2282     my $class = ref $self;
2283     $self->throw_exception (<<EOE);
2284
2285 No _dbh_last_insert_id() method found in $class.
2286 Since the method of obtaining the autoincrement id of the last insert
2287 operation varies greatly between different databases, this method must be
2288 individually implemented for every storage class.
2289 EOE
2290 }
2291
2292 sub last_insert_id {
2293   my $self = shift;
2294   $self->_dbh_last_insert_id ($self->_dbh, @_);
2295 }
2296
2297 =head2 _native_data_type
2298
2299 =over 4
2300
2301 =item Arguments: $type_name
2302
2303 =back
2304
2305 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2306 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2307 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
2308
2309 The default implementation returns C<undef>, implement in your Storage driver if
2310 you need this functionality.
2311
2312 Should map types from other databases to the native RDBMS type, for example
2313 C<VARCHAR2> to C<VARCHAR>.
2314
2315 Types with modifiers should map to the underlying data type. For example,
2316 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2317
2318 Composite types should map to the container type, for example
2319 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2320
2321 =cut
2322
2323 sub _native_data_type {
2324   #my ($self, $data_type) = @_;
2325   return undef
2326 }
2327
2328 # Check if placeholders are supported at all
2329 sub _placeholders_supported {
2330   my $self = shift;
2331   my $dbh  = $self->_get_dbh;
2332
2333   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2334   # but it is inaccurate more often than not
2335   eval {
2336     local $dbh->{PrintError} = 0;
2337     local $dbh->{RaiseError} = 1;
2338     $dbh->do('select ?', {}, 1);
2339   };
2340   return $@ ? 0 : 1;
2341 }
2342
2343 # Check if placeholders bound to non-string types throw exceptions
2344 #
2345 sub _typeless_placeholders_supported {
2346   my $self = shift;
2347   my $dbh  = $self->_get_dbh;
2348
2349   eval {
2350     local $dbh->{PrintError} = 0;
2351     local $dbh->{RaiseError} = 1;
2352     # this specifically tests a bind that is NOT a string
2353     $dbh->do('select 1 where 1 = ?', {}, 1);
2354   };
2355   return $@ ? 0 : 1;
2356 }
2357
2358 =head2 sqlt_type
2359
2360 Returns the database driver name.
2361
2362 =cut
2363
2364 sub sqlt_type {
2365   shift->_get_dbh->{Driver}->{Name};
2366 }
2367
2368 =head2 bind_attribute_by_data_type
2369
2370 Given a datatype from column info, returns a database specific bind
2371 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2372 let the database planner just handle it.
2373
2374 Generally only needed for special case column types, like bytea in postgres.
2375
2376 =cut
2377
2378 sub bind_attribute_by_data_type {
2379     return;
2380 }
2381
2382 =head2 is_datatype_numeric
2383
2384 Given a datatype from column_info, returns a boolean value indicating if
2385 the current RDBMS considers it a numeric value. This controls how
2386 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2387 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2388 be performed instead of the usual C<eq>.
2389
2390 =cut
2391
2392 sub is_datatype_numeric {
2393   my ($self, $dt) = @_;
2394
2395   return 0 unless $dt;
2396
2397   return $dt =~ /^ (?:
2398     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2399   ) $/ix;
2400 }
2401
2402
2403 =head2 create_ddl_dir (EXPERIMENTAL)
2404
2405 =over 4
2406
2407 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2408
2409 =back
2410
2411 Creates a SQL file based on the Schema, for each of the specified
2412 database engines in C<\@databases> in the given directory.
2413 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2414
2415 Given a previous version number, this will also create a file containing
2416 the ALTER TABLE statements to transform the previous schema into the
2417 current one. Note that these statements may contain C<DROP TABLE> or
2418 C<DROP COLUMN> statements that can potentially destroy data.
2419
2420 The file names are created using the C<ddl_filename> method below, please
2421 override this method in your schema if you would like a different file
2422 name format. For the ALTER file, the same format is used, replacing
2423 $version in the name with "$preversion-$version".
2424
2425 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2426 The most common value for this would be C<< { add_drop_table => 1 } >>
2427 to have the SQL produced include a C<DROP TABLE> statement for each table
2428 created. For quoting purposes supply C<quote_table_names> and
2429 C<quote_field_names>.
2430
2431 If no arguments are passed, then the following default values are assumed:
2432
2433 =over 4
2434
2435 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2436
2437 =item version    - $schema->schema_version
2438
2439 =item directory  - './'
2440
2441 =item preversion - <none>
2442
2443 =back
2444
2445 By default, C<\%sqlt_args> will have
2446
2447  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2448
2449 merged with the hash passed in. To disable any of those features, pass in a
2450 hashref like the following
2451
2452  { ignore_constraint_names => 0, # ... other options }
2453
2454
2455 Note that this feature is currently EXPERIMENTAL and may not work correctly
2456 across all databases, or fully handle complex relationships.
2457
2458 WARNING: Please check all SQL files created, before applying them.
2459
2460 =cut
2461
2462 sub create_ddl_dir {
2463   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2464
2465   if(!$dir || !-d $dir) {
2466     carp "No directory given, using ./\n";
2467     $dir = "./";
2468   }
2469   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2470   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2471
2472   my $schema_version = $schema->schema_version || '1.x';
2473   $version ||= $schema_version;
2474
2475   $sqltargs = {
2476     add_drop_table => 1,
2477     ignore_constraint_names => 1,
2478     ignore_index_names => 1,
2479     %{$sqltargs || {}}
2480   };
2481
2482   $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
2483     if !$self->_sqlt_version_ok;
2484
2485   my $sqlt = SQL::Translator->new( $sqltargs );
2486
2487   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2488   my $sqlt_schema = $sqlt->translate({ data => $schema })
2489     or $self->throw_exception ($sqlt->error);
2490
2491   foreach my $db (@$databases) {
2492     $sqlt->reset();
2493     $sqlt->{schema} = $sqlt_schema;
2494     $sqlt->producer($db);
2495
2496     my $file;
2497     my $filename = $schema->ddl_filename($db, $version, $dir);
2498     if (-e $filename && ($version eq $schema_version )) {
2499       # if we are dumping the current version, overwrite the DDL
2500       carp "Overwriting existing DDL file - $filename";
2501       unlink($filename);
2502     }
2503
2504     my $output = $sqlt->translate;
2505     if(!$output) {
2506       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2507       next;
2508     }
2509     if(!open($file, ">$filename")) {
2510       $self->throw_exception("Can't open $filename for writing ($!)");
2511       next;
2512     }
2513     print $file $output;
2514     close($file);
2515
2516     next unless ($preversion);
2517
2518     require SQL::Translator::Diff;
2519
2520     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2521     if(!-e $prefilename) {
2522       carp("No previous schema file found ($prefilename)");
2523       next;
2524     }
2525
2526     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2527     if(-e $difffile) {
2528       carp("Overwriting existing diff file - $difffile");
2529       unlink($difffile);
2530     }
2531
2532     my $source_schema;
2533     {
2534       my $t = SQL::Translator->new($sqltargs);
2535       $t->debug( 0 );
2536       $t->trace( 0 );
2537
2538       $t->parser( $db )
2539         or $self->throw_exception ($t->error);
2540
2541       my $out = $t->translate( $prefilename )
2542         or $self->throw_exception ($t->error);
2543
2544       $source_schema = $t->schema;
2545
2546       $source_schema->name( $prefilename )
2547         unless ( $source_schema->name );
2548     }
2549
2550     # The "new" style of producers have sane normalization and can support
2551     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2552     # And we have to diff parsed SQL against parsed SQL.
2553     my $dest_schema = $sqlt_schema;
2554
2555     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2556       my $t = SQL::Translator->new($sqltargs);
2557       $t->debug( 0 );
2558       $t->trace( 0 );
2559
2560       $t->parser( $db )
2561         or $self->throw_exception ($t->error);
2562
2563       my $out = $t->translate( $filename )
2564         or $self->throw_exception ($t->error);
2565
2566       $dest_schema = $t->schema;
2567
2568       $dest_schema->name( $filename )
2569         unless $dest_schema->name;
2570     }
2571
2572     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2573                                                   $dest_schema,   $db,
2574                                                   $sqltargs
2575                                                  );
2576     if(!open $file, ">$difffile") {
2577       $self->throw_exception("Can't write to $difffile ($!)");
2578       next;
2579     }
2580     print $file $diff;
2581     close($file);
2582   }
2583 }
2584
2585 =head2 deployment_statements
2586
2587 =over 4
2588
2589 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2590
2591 =back
2592
2593 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2594
2595 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2596 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2597
2598 C<$directory> is used to return statements from files in a previously created
2599 L</create_ddl_dir> directory and is optional. The filenames are constructed
2600 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2601
2602 If no C<$directory> is specified then the statements are constructed on the
2603 fly using L<SQL::Translator> and C<$version> is ignored.
2604
2605 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2606
2607 =cut
2608
2609 sub deployment_statements {
2610   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2611   $type ||= $self->sqlt_type;
2612   $version ||= $schema->schema_version || '1.x';
2613   $dir ||= './';
2614   my $filename = $schema->ddl_filename($type, $version, $dir);
2615   if(-f $filename)
2616   {
2617       my $file;
2618       open($file, "<$filename")
2619         or $self->throw_exception("Can't open $filename ($!)");
2620       my @rows = <$file>;
2621       close($file);
2622       return join('', @rows);
2623   }
2624
2625   $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
2626     if !$self->_sqlt_version_ok;
2627
2628   # sources needs to be a parser arg, but for simplicty allow at top level
2629   # coming in
2630   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2631       if exists $sqltargs->{sources};
2632
2633   my $tr = SQL::Translator->new(
2634     producer => "SQL::Translator::Producer::${type}",
2635     %$sqltargs,
2636     parser => 'SQL::Translator::Parser::DBIx::Class',
2637     data => $schema,
2638   );
2639
2640   my $ret = $tr->translate
2641     or $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error);
2642
2643   return $ret;
2644 }
2645
2646 sub deploy {
2647   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2648   my $deploy = sub {
2649     my $line = shift;
2650     return if($line =~ /^--/);
2651     return if(!$line);
2652     # next if($line =~ /^DROP/m);
2653     return if($line =~ /^BEGIN TRANSACTION/m);
2654     return if($line =~ /^COMMIT/m);
2655     return if $line =~ /^\s+$/; # skip whitespace only
2656     $self->_query_start($line);
2657     eval {
2658       # do a dbh_do cycle here, as we need some error checking in
2659       # place (even though we will ignore errors)
2660       $self->dbh_do (sub { $_[1]->do($line) });
2661     };
2662     if ($@) {
2663       carp qq{$@ (running "${line}")};
2664     }
2665     $self->_query_end($line);
2666   };
2667   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2668   if (@statements > 1) {
2669     foreach my $statement (@statements) {
2670       $deploy->( $statement );
2671     }
2672   }
2673   elsif (@statements == 1) {
2674     foreach my $line ( split(";\n", $statements[0])) {
2675       $deploy->( $line );
2676     }
2677   }
2678 }
2679
2680 =head2 datetime_parser
2681
2682 Returns the datetime parser class
2683
2684 =cut
2685
2686 sub datetime_parser {
2687   my $self = shift;
2688   return $self->{datetime_parser} ||= do {
2689     $self->build_datetime_parser(@_);
2690   };
2691 }
2692
2693 =head2 datetime_parser_type
2694
2695 Defines (returns) the datetime parser class - currently hardwired to
2696 L<DateTime::Format::MySQL>
2697
2698 =cut
2699
2700 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2701
2702 =head2 build_datetime_parser
2703
2704 See L</datetime_parser>
2705
2706 =cut
2707
2708 sub build_datetime_parser {
2709   my $self = shift;
2710   my $type = $self->datetime_parser_type(@_);
2711   $self->ensure_class_loaded ($type);
2712   return $type;
2713 }
2714
2715
2716 =head2 is_replicating
2717
2718 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2719 replicate from a master database.  Default is undef, which is the result
2720 returned by databases that don't support replication.
2721
2722 =cut
2723
2724 sub is_replicating {
2725     return;
2726
2727 }
2728
2729 =head2 lag_behind_master
2730
2731 Returns a number that represents a certain amount of lag behind a master db
2732 when a given storage is replicating.  The number is database dependent, but
2733 starts at zero and increases with the amount of lag. Default in undef
2734
2735 =cut
2736
2737 sub lag_behind_master {
2738     return;
2739 }
2740
2741 # SQLT version handling
2742 {
2743   my $_sqlt_version_ok;     # private
2744   my $_sqlt_version_error;  # private
2745
2746   sub _sqlt_version_ok {
2747     if (!defined $_sqlt_version_ok) {
2748       eval "use SQL::Translator $minimum_sqlt_version";
2749       if ($@) {
2750         $_sqlt_version_ok = 0;
2751         $_sqlt_version_error = $@;
2752       }
2753       else {
2754         $_sqlt_version_ok = 1;
2755       }
2756     }
2757     return $_sqlt_version_ok;
2758   }
2759
2760   sub _sqlt_version_error {
2761     shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
2762     return $_sqlt_version_error;
2763   }
2764
2765   sub _sqlt_minimum_version { $minimum_sqlt_version };
2766 }
2767
2768 sub DESTROY {
2769   my $self = shift;
2770
2771   $self->_verify_pid if $self->_dbh;
2772
2773   # some databases need this to stop spewing warnings
2774   if (my $dbh = $self->_dbh) {
2775     local $@;
2776     eval { $dbh->disconnect };
2777   }
2778
2779   $self->_dbh(undef);
2780 }
2781
2782 1;
2783
2784 =head1 USAGE NOTES
2785
2786 =head2 DBIx::Class and AutoCommit
2787
2788 DBIx::Class can do some wonderful magic with handling exceptions,
2789 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2790 (the default) combined with C<txn_do> for transaction support.
2791
2792 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2793 in an assumed transaction between commits, and you're telling us you'd
2794 like to manage that manually.  A lot of the magic protections offered by
2795 this module will go away.  We can't protect you from exceptions due to database
2796 disconnects because we don't know anything about how to restart your
2797 transactions.  You're on your own for handling all sorts of exceptional
2798 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2799 be with raw DBI.
2800
2801
2802 =head1 AUTHORS
2803
2804 Matt S. Trout <mst@shadowcatsystems.co.uk>
2805
2806 Andy Grundman <andy@hybridized.org>
2807
2808 =head1 LICENSE
2809
2810 You may distribute this code under the same terms as Perl itself.
2811
2812 =cut