_cond_for_update_delete is hopelessly broken attempting to introspect SQLA1. Replace...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17
18 # what version of sqlt do we require if deploy() without a ddl_dir is invoked
19 # when changing also adjust the corresponding author_require in Makefile.PL
20 my $minimum_sqlt_version = '0.11002';
21
22
23 __PACKAGE__->mk_group_accessors('simple' =>
24   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
25      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
26 );
27
28 # the values for these accessors are picked out (and deleted) from
29 # the attribute hashref passed to connect_info
30 my @storage_options = qw/
31   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
32   disable_sth_caching unsafe auto_savepoint
33 /;
34 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
35
36
37 # default cursor class, overridable in connect_info attributes
38 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
39
40 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
41 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
42
43
44 # Each of these methods need _determine_driver called before itself
45 # in order to function reliably. This is a purely DRY optimization
46 my @rdbms_specific_methods = qw/
47   sqlt_type
48   build_datetime_parser
49   datetime_parser_type
50
51   insert
52   insert_bulk
53   update
54   delete
55   select
56   select_single
57 /;
58
59 for my $meth (@rdbms_specific_methods) {
60
61   my $orig = __PACKAGE__->can ($meth)
62     or next;
63
64   no strict qw/refs/;
65   no warnings qw/redefine/;
66   *{__PACKAGE__ ."::$meth"} = sub {
67     if (not $_[0]->_driver_determined) {
68       $_[0]->_determine_driver;
69       goto $_[0]->can($meth);
70     }
71     $orig->(@_);
72   };
73 }
74
75
76 =head1 NAME
77
78 DBIx::Class::Storage::DBI - DBI storage handler
79
80 =head1 SYNOPSIS
81
82   my $schema = MySchema->connect('dbi:SQLite:my.db');
83
84   $schema->storage->debug(1);
85
86   my @stuff = $schema->storage->dbh_do(
87     sub {
88       my ($storage, $dbh, @args) = @_;
89       $dbh->do("DROP TABLE authors");
90     },
91     @column_list
92   );
93
94   $schema->resultset('Book')->search({
95      written_on => $schema->storage->datetime_parser(DateTime->now)
96   });
97
98 =head1 DESCRIPTION
99
100 This class represents the connection to an RDBMS via L<DBI>.  See
101 L<DBIx::Class::Storage> for general information.  This pod only
102 documents DBI-specific methods and behaviors.
103
104 =head1 METHODS
105
106 =cut
107
108 sub new {
109   my $new = shift->next::method(@_);
110
111   $new->transaction_depth(0);
112   $new->_sql_maker_opts({});
113   $new->{savepoints} = [];
114   $new->{_in_dbh_do} = 0;
115   $new->{_dbh_gen} = 0;
116
117   $new;
118 }
119
120 =head2 connect_info
121
122 This method is normally called by L<DBIx::Class::Schema/connection>, which
123 encapsulates its argument list in an arrayref before passing them here.
124
125 The argument list may contain:
126
127 =over
128
129 =item *
130
131 The same 4-element argument set one would normally pass to
132 L<DBI/connect>, optionally followed by
133 L<extra attributes|/DBIx::Class specific connection attributes>
134 recognized by DBIx::Class:
135
136   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
137
138 =item *
139
140 A single code reference which returns a connected
141 L<DBI database handle|DBI/connect> optionally followed by
142 L<extra attributes|/DBIx::Class specific connection attributes> recognized
143 by DBIx::Class:
144
145   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
146
147 =item *
148
149 A single hashref with all the attributes and the dsn/user/password
150 mixed together:
151
152   $connect_info_args = [{
153     dsn => $dsn,
154     user => $user,
155     password => $pass,
156     %dbi_attributes,
157     %extra_attributes,
158   }];
159
160   $connect_info_args = [{
161     dbh_maker => sub { DBI->connect (...) },
162     %dbi_attributes,
163     %extra_attributes,
164   }];
165
166 This is particularly useful for L<Catalyst> based applications, allowing the
167 following config (L<Config::General> style):
168
169   <Model::DB>
170     schema_class   App::DB
171     <connect_info>
172       dsn          dbi:mysql:database=test
173       user         testuser
174       password     TestPass
175       AutoCommit   1
176     </connect_info>
177   </Model::DB>
178
179 The C<dsn>/C<user>/C<password> combination can be substituted by the
180 C<dbh_maker> key whose value is a coderef that returns a connected
181 L<DBI database handle|DBI/connect>
182
183 =back
184
185 Please note that the L<DBI> docs recommend that you always explicitly
186 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
187 recommends that it be set to I<1>, and that you perform transactions
188 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
189 to I<1> if you do not do explicitly set it to zero.  This is the default
190 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
191
192 =head3 DBIx::Class specific connection attributes
193
194 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
195 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
196 the following connection options. These options can be mixed in with your other
197 L<DBI> connection attributes, or placed in a seperate hashref
198 (C<\%extra_attributes>) as shown above.
199
200 Every time C<connect_info> is invoked, any previous settings for
201 these options will be cleared before setting the new ones, regardless of
202 whether any options are specified in the new C<connect_info>.
203
204
205 =over
206
207 =item on_connect_do
208
209 Specifies things to do immediately after connecting or re-connecting to
210 the database.  Its value may contain:
211
212 =over
213
214 =item a scalar
215
216 This contains one SQL statement to execute.
217
218 =item an array reference
219
220 This contains SQL statements to execute in order.  Each element contains
221 a string or a code reference that returns a string.
222
223 =item a code reference
224
225 This contains some code to execute.  Unlike code references within an
226 array reference, its return value is ignored.
227
228 =back
229
230 =item on_disconnect_do
231
232 Takes arguments in the same form as L</on_connect_do> and executes them
233 immediately before disconnecting from the database.
234
235 Note, this only runs if you explicitly call L</disconnect> on the
236 storage object.
237
238 =item on_connect_call
239
240 A more generalized form of L</on_connect_do> that calls the specified
241 C<connect_call_METHOD> methods in your storage driver.
242
243   on_connect_do => 'select 1'
244
245 is equivalent to:
246
247   on_connect_call => [ [ do_sql => 'select 1' ] ]
248
249 Its values may contain:
250
251 =over
252
253 =item a scalar
254
255 Will call the C<connect_call_METHOD> method.
256
257 =item a code reference
258
259 Will execute C<< $code->($storage) >>
260
261 =item an array reference
262
263 Each value can be a method name or code reference.
264
265 =item an array of arrays
266
267 For each array, the first item is taken to be the C<connect_call_> method name
268 or code reference, and the rest are parameters to it.
269
270 =back
271
272 Some predefined storage methods you may use:
273
274 =over
275
276 =item do_sql
277
278 Executes a SQL string or a code reference that returns a SQL string. This is
279 what L</on_connect_do> and L</on_disconnect_do> use.
280
281 It can take:
282
283 =over
284
285 =item a scalar
286
287 Will execute the scalar as SQL.
288
289 =item an arrayref
290
291 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
292 attributes hashref and bind values.
293
294 =item a code reference
295
296 Will execute C<< $code->($storage) >> and execute the return array refs as
297 above.
298
299 =back
300
301 =item datetime_setup
302
303 Execute any statements necessary to initialize the database session to return
304 and accept datetime/timestamp values used with
305 L<DBIx::Class::InflateColumn::DateTime>.
306
307 Only necessary for some databases, see your specific storage driver for
308 implementation details.
309
310 =back
311
312 =item on_disconnect_call
313
314 Takes arguments in the same form as L</on_connect_call> and executes them
315 immediately before disconnecting from the database.
316
317 Calls the C<disconnect_call_METHOD> methods as opposed to the
318 C<connect_call_METHOD> methods called by L</on_connect_call>.
319
320 Note, this only runs if you explicitly call L</disconnect> on the
321 storage object.
322
323 =item disable_sth_caching
324
325 If set to a true value, this option will disable the caching of
326 statement handles via L<DBI/prepare_cached>.
327
328 =item limit_dialect
329
330 Sets the limit dialect. This is useful for JDBC-bridge among others
331 where the remote SQL-dialect cannot be determined by the name of the
332 driver alone. See also L<SQL::Abstract::Limit>.
333
334 =item quote_char
335
336 Specifies what characters to use to quote table and column names. If
337 you use this you will want to specify L</name_sep> as well.
338
339 C<quote_char> expects either a single character, in which case is it
340 is placed on either side of the table/column name, or an arrayref of length
341 2 in which case the table/column name is placed between the elements.
342
343 For example under MySQL you should use C<< quote_char => '`' >>, and for
344 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
345
346 =item name_sep
347
348 This only needs to be used in conjunction with C<quote_char>, and is used to
349 specify the charecter that seperates elements (schemas, tables, columns) from
350 each other. In most cases this is simply a C<.>.
351
352 The consequences of not supplying this value is that L<SQL::Abstract>
353 will assume DBIx::Class' uses of aliases to be complete column
354 names. The output will look like I<"me.name"> when it should actually
355 be I<"me"."name">.
356
357 =item unsafe
358
359 This Storage driver normally installs its own C<HandleError>, sets
360 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
361 all database handles, including those supplied by a coderef.  It does this
362 so that it can have consistent and useful error behavior.
363
364 If you set this option to a true value, Storage will not do its usual
365 modifications to the database handle's attributes, and instead relies on
366 the settings in your connect_info DBI options (or the values you set in
367 your connection coderef, in the case that you are connecting via coderef).
368
369 Note that your custom settings can cause Storage to malfunction,
370 especially if you set a C<HandleError> handler that suppresses exceptions
371 and/or disable C<RaiseError>.
372
373 =item auto_savepoint
374
375 If this option is true, L<DBIx::Class> will use savepoints when nesting
376 transactions, making it possible to recover from failure in the inner
377 transaction without having to abort all outer transactions.
378
379 =item cursor_class
380
381 Use this argument to supply a cursor class other than the default
382 L<DBIx::Class::Storage::DBI::Cursor>.
383
384 =back
385
386 Some real-life examples of arguments to L</connect_info> and
387 L<DBIx::Class::Schema/connect>
388
389   # Simple SQLite connection
390   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
391
392   # Connect via subref
393   ->connect_info([ sub { DBI->connect(...) } ]);
394
395   # Connect via subref in hashref
396   ->connect_info([{
397     dbh_maker => sub { DBI->connect(...) },
398     on_connect_do => 'alter session ...',
399   }]);
400
401   # A bit more complicated
402   ->connect_info(
403     [
404       'dbi:Pg:dbname=foo',
405       'postgres',
406       'my_pg_password',
407       { AutoCommit => 1 },
408       { quote_char => q{"}, name_sep => q{.} },
409     ]
410   );
411
412   # Equivalent to the previous example
413   ->connect_info(
414     [
415       'dbi:Pg:dbname=foo',
416       'postgres',
417       'my_pg_password',
418       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
419     ]
420   );
421
422   # Same, but with hashref as argument
423   # See parse_connect_info for explanation
424   ->connect_info(
425     [{
426       dsn         => 'dbi:Pg:dbname=foo',
427       user        => 'postgres',
428       password    => 'my_pg_password',
429       AutoCommit  => 1,
430       quote_char  => q{"},
431       name_sep    => q{.},
432     }]
433   );
434
435   # Subref + DBIx::Class-specific connection options
436   ->connect_info(
437     [
438       sub { DBI->connect(...) },
439       {
440           quote_char => q{`},
441           name_sep => q{@},
442           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
443           disable_sth_caching => 1,
444       },
445     ]
446   );
447
448
449
450 =cut
451
452 sub connect_info {
453   my ($self, $info_arg) = @_;
454
455   return $self->_connect_info if !$info_arg;
456
457   my @args = @$info_arg;  # take a shallow copy for further mutilation
458   $self->_connect_info([@args]); # copy for _connect_info
459
460
461   # combine/pre-parse arguments depending on invocation style
462
463   my %attrs;
464   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
465     %attrs = %{ $args[1] || {} };
466     @args = $args[0];
467   }
468   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
469     %attrs = %{$args[0]};
470     @args = ();
471     if (my $code = delete $attrs{dbh_maker}) {
472       @args = $code;
473
474       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
475       if (@ignored) {
476         carp sprintf (
477             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
478           . "to the result of 'dbh_maker'",
479
480           join (', ', map { "'$_'" } (@ignored) ),
481         );
482       }
483     }
484     else {
485       @args = delete @attrs{qw/dsn user password/};
486     }
487   }
488   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
489     %attrs = (
490       % { $args[3] || {} },
491       % { $args[4] || {} },
492     );
493     @args = @args[0,1,2];
494   }
495
496   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
497   #  the new set of options
498   $self->_sql_maker(undef);
499   $self->_sql_maker_opts({});
500
501   if(keys %attrs) {
502     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
503       if(my $value = delete $attrs{$storage_opt}) {
504         $self->$storage_opt($value);
505       }
506     }
507     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
508       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
509         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
510       }
511     }
512   }
513
514   if (ref $args[0] eq 'CODE') {
515     # _connect() never looks past $args[0] in this case
516     %attrs = ()
517   } else {
518     %attrs = (
519       %{ $self->_default_dbi_connect_attributes || {} },
520       %attrs,
521     );
522   }
523
524   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
525   $self->_connect_info;
526 }
527
528 sub _default_dbi_connect_attributes {
529   return {
530     AutoCommit => 1,
531     RaiseError => 1,
532     PrintError => 0,
533   };
534 }
535
536 =head2 on_connect_do
537
538 This method is deprecated in favour of setting via L</connect_info>.
539
540 =cut
541
542 =head2 on_disconnect_do
543
544 This method is deprecated in favour of setting via L</connect_info>.
545
546 =cut
547
548 sub _parse_connect_do {
549   my ($self, $type) = @_;
550
551   my $val = $self->$type;
552   return () if not defined $val;
553
554   my @res;
555
556   if (not ref($val)) {
557     push @res, [ 'do_sql', $val ];
558   } elsif (ref($val) eq 'CODE') {
559     push @res, $val;
560   } elsif (ref($val) eq 'ARRAY') {
561     push @res, map { [ 'do_sql', $_ ] } @$val;
562   } else {
563     $self->throw_exception("Invalid type for $type: ".ref($val));
564   }
565
566   return \@res;
567 }
568
569 =head2 dbh_do
570
571 Arguments: ($subref | $method_name), @extra_coderef_args?
572
573 Execute the given $subref or $method_name using the new exception-based
574 connection management.
575
576 The first two arguments will be the storage object that C<dbh_do> was called
577 on and a database handle to use.  Any additional arguments will be passed
578 verbatim to the called subref as arguments 2 and onwards.
579
580 Using this (instead of $self->_dbh or $self->dbh) ensures correct
581 exception handling and reconnection (or failover in future subclasses).
582
583 Your subref should have no side-effects outside of the database, as
584 there is the potential for your subref to be partially double-executed
585 if the database connection was stale/dysfunctional.
586
587 Example:
588
589   my @stuff = $schema->storage->dbh_do(
590     sub {
591       my ($storage, $dbh, @cols) = @_;
592       my $cols = join(q{, }, @cols);
593       $dbh->selectrow_array("SELECT $cols FROM foo");
594     },
595     @column_list
596   );
597
598 =cut
599
600 sub dbh_do {
601   my $self = shift;
602   my $code = shift;
603
604   my $dbh = $self->_get_dbh;
605
606   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
607       || $self->{transaction_depth};
608
609   local $self->{_in_dbh_do} = 1;
610
611   my @result;
612   my $want_array = wantarray;
613
614   eval {
615
616     if($want_array) {
617         @result = $self->$code($dbh, @_);
618     }
619     elsif(defined $want_array) {
620         $result[0] = $self->$code($dbh, @_);
621     }
622     else {
623         $self->$code($dbh, @_);
624     }
625   };
626
627   # ->connected might unset $@ - copy
628   my $exception = $@;
629   if(!$exception) { return $want_array ? @result : $result[0] }
630
631   $self->throw_exception($exception) if $self->connected;
632
633   # We were not connected - reconnect and retry, but let any
634   #  exception fall right through this time
635   carp "Retrying $code after catching disconnected exception: $exception"
636     if $ENV{DBIC_DBIRETRY_DEBUG};
637   $self->_populate_dbh;
638   $self->$code($self->_dbh, @_);
639 }
640
641 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
642 # It also informs dbh_do to bypass itself while under the direction of txn_do,
643 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
644 sub txn_do {
645   my $self = shift;
646   my $coderef = shift;
647
648   ref $coderef eq 'CODE' or $self->throw_exception
649     ('$coderef must be a CODE reference');
650
651   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
652
653   local $self->{_in_dbh_do} = 1;
654
655   my @result;
656   my $want_array = wantarray;
657
658   my $tried = 0;
659   while(1) {
660     eval {
661       $self->_get_dbh;
662
663       $self->txn_begin;
664       if($want_array) {
665           @result = $coderef->(@_);
666       }
667       elsif(defined $want_array) {
668           $result[0] = $coderef->(@_);
669       }
670       else {
671           $coderef->(@_);
672       }
673       $self->txn_commit;
674     };
675
676     # ->connected might unset $@ - copy
677     my $exception = $@;
678     if(!$exception) { return $want_array ? @result : $result[0] }
679
680     if($tried++ || $self->connected) {
681       eval { $self->txn_rollback };
682       my $rollback_exception = $@;
683       if($rollback_exception) {
684         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
685         $self->throw_exception($exception)  # propagate nested rollback
686           if $rollback_exception =~ /$exception_class/;
687
688         $self->throw_exception(
689           "Transaction aborted: ${exception}. "
690           . "Rollback failed: ${rollback_exception}"
691         );
692       }
693       $self->throw_exception($exception)
694     }
695
696     # We were not connected, and was first try - reconnect and retry
697     # via the while loop
698     carp "Retrying $coderef after catching disconnected exception: $exception"
699       if $ENV{DBIC_DBIRETRY_DEBUG};
700     $self->_populate_dbh;
701   }
702 }
703
704 =head2 disconnect
705
706 Our C<disconnect> method also performs a rollback first if the
707 database is not in C<AutoCommit> mode.
708
709 =cut
710
711 sub disconnect {
712   my ($self) = @_;
713
714   if( $self->_dbh ) {
715     my @actions;
716
717     push @actions, ( $self->on_disconnect_call || () );
718     push @actions, $self->_parse_connect_do ('on_disconnect_do');
719
720     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
721
722     $self->_dbh_rollback unless $self->_dbh_autocommit;
723
724     $self->_dbh->disconnect;
725     $self->_dbh(undef);
726     $self->{_dbh_gen}++;
727   }
728 }
729
730 =head2 with_deferred_fk_checks
731
732 =over 4
733
734 =item Arguments: C<$coderef>
735
736 =item Return Value: The return value of $coderef
737
738 =back
739
740 Storage specific method to run the code ref with FK checks deferred or
741 in MySQL's case disabled entirely.
742
743 =cut
744
745 # Storage subclasses should override this
746 sub with_deferred_fk_checks {
747   my ($self, $sub) = @_;
748   $sub->();
749 }
750
751 =head2 connected
752
753 =over
754
755 =item Arguments: none
756
757 =item Return Value: 1|0
758
759 =back
760
761 Verifies that the the current database handle is active and ready to execute
762 an SQL statement (i.e. the connection did not get stale, server is still
763 answering, etc.) This method is used internally by L</dbh>.
764
765 =cut
766
767 sub connected {
768   my $self = shift;
769   return 0 unless $self->_seems_connected;
770
771   #be on the safe side
772   local $self->_dbh->{RaiseError} = 1;
773
774   return $self->_ping;
775 }
776
777 sub _seems_connected {
778   my $self = shift;
779
780   my $dbh = $self->_dbh
781     or return 0;
782
783   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
784     $self->_dbh(undef);
785     $self->{_dbh_gen}++;
786     return 0;
787   }
788   else {
789     $self->_verify_pid;
790     return 0 if !$self->_dbh;
791   }
792
793   return $dbh->FETCH('Active');
794 }
795
796 sub _ping {
797   my $self = shift;
798
799   my $dbh = $self->_dbh or return 0;
800
801   return $dbh->ping;
802 }
803
804 # handle pid changes correctly
805 #  NOTE: assumes $self->_dbh is a valid $dbh
806 sub _verify_pid {
807   my ($self) = @_;
808
809   return if defined $self->_conn_pid && $self->_conn_pid == $$;
810
811   $self->_dbh->{InactiveDestroy} = 1;
812   $self->_dbh(undef);
813   $self->{_dbh_gen}++;
814
815   return;
816 }
817
818 sub ensure_connected {
819   my ($self) = @_;
820
821   unless ($self->connected) {
822     $self->_populate_dbh;
823   }
824 }
825
826 =head2 dbh
827
828 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
829 is guaranteed to be healthy by implicitly calling L</connected>, and if
830 necessary performing a reconnection before returning. Keep in mind that this
831 is very B<expensive> on some database engines. Consider using L<dbh_do>
832 instead.
833
834 =cut
835
836 sub dbh {
837   my ($self) = @_;
838
839   if (not $self->_dbh) {
840     $self->_populate_dbh;
841   } else {
842     $self->ensure_connected;
843   }
844   return $self->_dbh;
845 }
846
847 # this is the internal "get dbh or connect (don't check)" method
848 sub _get_dbh {
849   my $self = shift;
850   $self->_verify_pid if $self->_dbh;
851   $self->_populate_dbh unless $self->_dbh;
852   return $self->_dbh;
853 }
854
855 sub _sql_maker_args {
856     my ($self) = @_;
857
858     return (
859       bindtype=>'columns',
860       array_datatypes => 1,
861       limit_dialect => $self->_get_dbh,
862       %{$self->_sql_maker_opts}
863     );
864 }
865
866 sub sql_maker {
867   my ($self) = @_;
868   unless ($self->_sql_maker) {
869     my $sql_maker_class = $self->sql_maker_class;
870     $self->ensure_class_loaded ($sql_maker_class);
871     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
872   }
873   return $self->_sql_maker;
874 }
875
876 # nothing to do by default
877 sub _rebless {}
878 sub _init {}
879
880 sub _populate_dbh {
881   my ($self) = @_;
882
883   my @info = @{$self->_dbi_connect_info || []};
884   $self->_dbh(undef); # in case ->connected failed we might get sent here
885   $self->_dbh($self->_connect(@info));
886
887   $self->_conn_pid($$);
888   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
889
890   $self->_determine_driver;
891
892   # Always set the transaction depth on connect, since
893   #  there is no transaction in progress by definition
894   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
895
896   $self->_run_connection_actions unless $self->{_in_determine_driver};
897 }
898
899 sub _run_connection_actions {
900   my $self = shift;
901   my @actions;
902
903   push @actions, ( $self->on_connect_call || () );
904   push @actions, $self->_parse_connect_do ('on_connect_do');
905
906   $self->_do_connection_actions(connect_call_ => $_) for @actions;
907 }
908
909 sub _determine_driver {
910   my ($self) = @_;
911
912   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
913     my $started_connected = 0;
914     local $self->{_in_determine_driver} = 1;
915
916     if (ref($self) eq __PACKAGE__) {
917       my $driver;
918       if ($self->_dbh) { # we are connected
919         $driver = $self->_dbh->{Driver}{Name};
920         $started_connected = 1;
921       } else {
922         # if connect_info is a CODEREF, we have no choice but to connect
923         if (ref $self->_dbi_connect_info->[0] &&
924             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
925           $self->_populate_dbh;
926           $driver = $self->_dbh->{Driver}{Name};
927         }
928         else {
929           # try to use dsn to not require being connected, the driver may still
930           # force a connection in _rebless to determine version
931           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
932         }
933       }
934
935       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
936       if ($self->load_optional_class($storage_class)) {
937         mro::set_mro($storage_class, 'c3');
938         bless $self, $storage_class;
939         $self->_rebless();
940       }
941     }
942
943     $self->_driver_determined(1);
944
945     $self->_init; # run driver-specific initializations
946
947     $self->_run_connection_actions
948         if !$started_connected && defined $self->_dbh;
949   }
950 }
951
952 sub _do_connection_actions {
953   my $self          = shift;
954   my $method_prefix = shift;
955   my $call          = shift;
956
957   if (not ref($call)) {
958     my $method = $method_prefix . $call;
959     $self->$method(@_);
960   } elsif (ref($call) eq 'CODE') {
961     $self->$call(@_);
962   } elsif (ref($call) eq 'ARRAY') {
963     if (ref($call->[0]) ne 'ARRAY') {
964       $self->_do_connection_actions($method_prefix, $_) for @$call;
965     } else {
966       $self->_do_connection_actions($method_prefix, @$_) for @$call;
967     }
968   } else {
969     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
970   }
971
972   return $self;
973 }
974
975 sub connect_call_do_sql {
976   my $self = shift;
977   $self->_do_query(@_);
978 }
979
980 sub disconnect_call_do_sql {
981   my $self = shift;
982   $self->_do_query(@_);
983 }
984
985 # override in db-specific backend when necessary
986 sub connect_call_datetime_setup { 1 }
987
988 sub _do_query {
989   my ($self, $action) = @_;
990
991   if (ref $action eq 'CODE') {
992     $action = $action->($self);
993     $self->_do_query($_) foreach @$action;
994   }
995   else {
996     # Most debuggers expect ($sql, @bind), so we need to exclude
997     # the attribute hash which is the second argument to $dbh->do
998     # furthermore the bind values are usually to be presented
999     # as named arrayref pairs, so wrap those here too
1000     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1001     my $sql = shift @do_args;
1002     my $attrs = shift @do_args;
1003     my @bind = map { [ undef, $_ ] } @do_args;
1004
1005     $self->_query_start($sql, @bind);
1006     $self->_get_dbh->do($sql, $attrs, @do_args);
1007     $self->_query_end($sql, @bind);
1008   }
1009
1010   return $self;
1011 }
1012
1013 sub _connect {
1014   my ($self, @info) = @_;
1015
1016   $self->throw_exception("You failed to provide any connection info")
1017     if !@info;
1018
1019   my ($old_connect_via, $dbh);
1020
1021   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1022     $old_connect_via = $DBI::connect_via;
1023     $DBI::connect_via = 'connect';
1024   }
1025
1026   eval {
1027     if(ref $info[0] eq 'CODE') {
1028        $dbh = &{$info[0]}
1029     }
1030     else {
1031        $dbh = DBI->connect(@info);
1032     }
1033
1034     if($dbh && !$self->unsafe) {
1035       my $weak_self = $self;
1036       Scalar::Util::weaken($weak_self);
1037       $dbh->{HandleError} = sub {
1038           if ($weak_self) {
1039             $weak_self->throw_exception("DBI Exception: $_[0]");
1040           }
1041           else {
1042             # the handler may be invoked by something totally out of
1043             # the scope of DBIC
1044             croak ("DBI Exception: $_[0]");
1045           }
1046       };
1047       $dbh->{ShowErrorStatement} = 1;
1048       $dbh->{RaiseError} = 1;
1049       $dbh->{PrintError} = 0;
1050     }
1051   };
1052
1053   $DBI::connect_via = $old_connect_via if $old_connect_via;
1054
1055   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1056     if !$dbh || $@;
1057
1058   $self->_dbh_autocommit($dbh->{AutoCommit});
1059
1060   $dbh;
1061 }
1062
1063 sub svp_begin {
1064   my ($self, $name) = @_;
1065
1066   $name = $self->_svp_generate_name
1067     unless defined $name;
1068
1069   $self->throw_exception ("You can't use savepoints outside a transaction")
1070     if $self->{transaction_depth} == 0;
1071
1072   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1073     unless $self->can('_svp_begin');
1074
1075   push @{ $self->{savepoints} }, $name;
1076
1077   $self->debugobj->svp_begin($name) if $self->debug;
1078
1079   return $self->_svp_begin($name);
1080 }
1081
1082 sub svp_release {
1083   my ($self, $name) = @_;
1084
1085   $self->throw_exception ("You can't use savepoints outside a transaction")
1086     if $self->{transaction_depth} == 0;
1087
1088   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1089     unless $self->can('_svp_release');
1090
1091   if (defined $name) {
1092     $self->throw_exception ("Savepoint '$name' does not exist")
1093       unless grep { $_ eq $name } @{ $self->{savepoints} };
1094
1095     # Dig through the stack until we find the one we are releasing.  This keeps
1096     # the stack up to date.
1097     my $svp;
1098
1099     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1100   } else {
1101     $name = pop @{ $self->{savepoints} };
1102   }
1103
1104   $self->debugobj->svp_release($name) if $self->debug;
1105
1106   return $self->_svp_release($name);
1107 }
1108
1109 sub svp_rollback {
1110   my ($self, $name) = @_;
1111
1112   $self->throw_exception ("You can't use savepoints outside a transaction")
1113     if $self->{transaction_depth} == 0;
1114
1115   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1116     unless $self->can('_svp_rollback');
1117
1118   if (defined $name) {
1119       # If they passed us a name, verify that it exists in the stack
1120       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1121           $self->throw_exception("Savepoint '$name' does not exist!");
1122       }
1123
1124       # Dig through the stack until we find the one we are releasing.  This keeps
1125       # the stack up to date.
1126       while(my $s = pop(@{ $self->{savepoints} })) {
1127           last if($s eq $name);
1128       }
1129       # Add the savepoint back to the stack, as a rollback doesn't remove the
1130       # named savepoint, only everything after it.
1131       push(@{ $self->{savepoints} }, $name);
1132   } else {
1133       # We'll assume they want to rollback to the last savepoint
1134       $name = $self->{savepoints}->[-1];
1135   }
1136
1137   $self->debugobj->svp_rollback($name) if $self->debug;
1138
1139   return $self->_svp_rollback($name);
1140 }
1141
1142 sub _svp_generate_name {
1143     my ($self) = @_;
1144
1145     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1146 }
1147
1148 sub txn_begin {
1149   my $self = shift;
1150   if($self->{transaction_depth} == 0) {
1151     $self->debugobj->txn_begin()
1152       if $self->debug;
1153     $self->_dbh_begin_work;
1154   }
1155   elsif ($self->auto_savepoint) {
1156     $self->svp_begin;
1157   }
1158   $self->{transaction_depth}++;
1159 }
1160
1161 sub _dbh_begin_work {
1162   my $self = shift;
1163
1164   # if the user is utilizing txn_do - good for him, otherwise we need to
1165   # ensure that the $dbh is healthy on BEGIN.
1166   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1167   # will be replaced by a failure of begin_work itself (which will be
1168   # then retried on reconnect)
1169   if ($self->{_in_dbh_do}) {
1170     $self->_dbh->begin_work;
1171   } else {
1172     $self->dbh_do(sub { $_[1]->begin_work });
1173   }
1174 }
1175
1176 sub txn_commit {
1177   my $self = shift;
1178   if ($self->{transaction_depth} == 1) {
1179     $self->debugobj->txn_commit()
1180       if ($self->debug);
1181     $self->_dbh_commit;
1182     $self->{transaction_depth} = 0
1183       if $self->_dbh_autocommit;
1184   }
1185   elsif($self->{transaction_depth} > 1) {
1186     $self->{transaction_depth}--;
1187     $self->svp_release
1188       if $self->auto_savepoint;
1189   }
1190 }
1191
1192 sub _dbh_commit {
1193   my $self = shift;
1194   my $dbh  = $self->_dbh
1195     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1196   $dbh->commit;
1197 }
1198
1199 sub txn_rollback {
1200   my $self = shift;
1201   my $dbh = $self->_dbh;
1202   eval {
1203     if ($self->{transaction_depth} == 1) {
1204       $self->debugobj->txn_rollback()
1205         if ($self->debug);
1206       $self->{transaction_depth} = 0
1207         if $self->_dbh_autocommit;
1208       $self->_dbh_rollback;
1209     }
1210     elsif($self->{transaction_depth} > 1) {
1211       $self->{transaction_depth}--;
1212       if ($self->auto_savepoint) {
1213         $self->svp_rollback;
1214         $self->svp_release;
1215       }
1216     }
1217     else {
1218       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1219     }
1220   };
1221   if ($@) {
1222     my $error = $@;
1223     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1224     $error =~ /$exception_class/ and $self->throw_exception($error);
1225     # ensure that a failed rollback resets the transaction depth
1226     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1227     $self->throw_exception($error);
1228   }
1229 }
1230
1231 sub _dbh_rollback {
1232   my $self = shift;
1233   my $dbh  = $self->_dbh
1234     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1235   $dbh->rollback;
1236 }
1237
1238 # This used to be the top-half of _execute.  It was split out to make it
1239 #  easier to override in NoBindVars without duping the rest.  It takes up
1240 #  all of _execute's args, and emits $sql, @bind.
1241 sub _prep_for_execute {
1242   my ($self, $op, $extra_bind, $ident, $args) = @_;
1243
1244   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1245     $ident = $ident->from();
1246   }
1247
1248   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1249
1250   unshift(@bind,
1251     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1252       if $extra_bind;
1253   return ($sql, \@bind);
1254 }
1255
1256
1257 sub _fix_bind_params {
1258     my ($self, @bind) = @_;
1259
1260     ### Turn @bind from something like this:
1261     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1262     ### to this:
1263     ###   ( "'1'", "'1'", "'3'" )
1264     return
1265         map {
1266             if ( defined( $_ && $_->[1] ) ) {
1267                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1268             }
1269             else { q{'NULL'}; }
1270         } @bind;
1271 }
1272
1273 sub _query_start {
1274     my ( $self, $sql, @bind ) = @_;
1275
1276     if ( $self->debug ) {
1277         @bind = $self->_fix_bind_params(@bind);
1278
1279         $self->debugobj->query_start( $sql, @bind );
1280     }
1281 }
1282
1283 sub _query_end {
1284     my ( $self, $sql, @bind ) = @_;
1285
1286     if ( $self->debug ) {
1287         @bind = $self->_fix_bind_params(@bind);
1288         $self->debugobj->query_end( $sql, @bind );
1289     }
1290 }
1291
1292 sub _dbh_execute {
1293   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1294
1295   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1296
1297   $self->_query_start( $sql, @$bind );
1298
1299   my $sth = $self->sth($sql,$op);
1300
1301   my $placeholder_index = 1;
1302
1303   foreach my $bound (@$bind) {
1304     my $attributes = {};
1305     my($column_name, @data) = @$bound;
1306
1307     if ($bind_attributes) {
1308       $attributes = $bind_attributes->{$column_name}
1309       if defined $bind_attributes->{$column_name};
1310     }
1311
1312     foreach my $data (@data) {
1313       my $ref = ref $data;
1314       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1315
1316       $sth->bind_param($placeholder_index, $data, $attributes);
1317       $placeholder_index++;
1318     }
1319   }
1320
1321   # Can this fail without throwing an exception anyways???
1322   my $rv = $sth->execute();
1323   $self->throw_exception($sth->errstr) if !$rv;
1324
1325   $self->_query_end( $sql, @$bind );
1326
1327   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1328 }
1329
1330 sub _execute {
1331     my $self = shift;
1332     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1333 }
1334
1335 sub insert {
1336   my ($self, $source, $to_insert) = @_;
1337
1338   my $ident = $source->from;
1339   my $bind_attributes = $self->source_bind_attributes($source);
1340
1341   my $updated_cols = {};
1342
1343   foreach my $col ( $source->columns ) {
1344     if ( !defined $to_insert->{$col} ) {
1345       my $col_info = $source->column_info($col);
1346
1347       if ( $col_info->{auto_nextval} ) {
1348         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1349           'nextval',
1350           $col_info->{sequence} ||
1351             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1352         );
1353       }
1354     }
1355   }
1356
1357   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1358
1359   return $updated_cols;
1360 }
1361
1362 ## Still not quite perfect, and EXPERIMENTAL
1363 ## Currently it is assumed that all values passed will be "normal", i.e. not
1364 ## scalar refs, or at least, all the same type as the first set, the statement is
1365 ## only prepped once.
1366 sub insert_bulk {
1367   my ($self, $source, $cols, $data) = @_;
1368
1369   my %colvalues;
1370   @colvalues{@$cols} = (0..$#$cols);
1371
1372   for my $i (0..$#$cols) {
1373     my $first_val = $data->[0][$i];
1374     next unless ref $first_val eq 'SCALAR';
1375
1376     $colvalues{ $cols->[$i] } = $first_val;
1377   }
1378
1379   # check for bad data and stringify stringifiable objects
1380   my $bad_slice = sub {
1381     my ($msg, $col_idx, $slice_idx) = @_;
1382     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1383       $msg,
1384       $cols->[$col_idx],
1385       do {
1386         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1387         Data::Dumper::Concise::Dumper({
1388           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1389         }),
1390       }
1391     );
1392   };
1393
1394   for my $datum_idx (0..$#$data) {
1395     my $datum = $data->[$datum_idx];
1396
1397     for my $col_idx (0..$#$cols) {
1398       my $val            = $datum->[$col_idx];
1399       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1400       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1401
1402       if ($is_literal_sql) {
1403         if (not ref $val) {
1404           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1405         }
1406         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1407           $bad_slice->("$reftype reference found where literal SQL expected",
1408             $col_idx, $datum_idx);
1409         }
1410         elsif ($$val ne $$sqla_bind){
1411           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1412             $col_idx, $datum_idx);
1413         }
1414       }
1415       elsif (my $reftype = ref $val) {
1416         require overload;
1417         if (overload::Method($val, '""')) {
1418           $datum->[$col_idx] = "".$val;
1419         }
1420         else {
1421           $bad_slice->("$reftype reference found where bind expected",
1422             $col_idx, $datum_idx);
1423         }
1424       }
1425     }
1426   }
1427
1428   my ($sql, $bind) = $self->_prep_for_execute (
1429     'insert', undef, $source, [\%colvalues]
1430   );
1431   my @bind = @$bind;
1432
1433   my $empty_bind = 1 if (not @bind) &&
1434     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1435
1436   if ((not @bind) && (not $empty_bind)) {
1437     $self->throw_exception(
1438       'Cannot insert_bulk without support for placeholders'
1439     );
1440   }
1441
1442   $self->_query_start( $sql, ['__BULK__'] );
1443   my $sth = $self->sth($sql);
1444
1445   my $rv = do {
1446     if ($empty_bind) {
1447       # bind_param_array doesn't work if there are no binds
1448       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1449     }
1450     else {
1451 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1452       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1453     }
1454   };
1455
1456   $self->_query_end( $sql, ['__BULK__'] );
1457
1458   return (wantarray ? ($rv, $sth, @bind) : $rv);
1459 }
1460
1461 sub _execute_array {
1462   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1463
1464   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1465
1466   ## This must be an arrayref, else nothing works!
1467   my $tuple_status = [];
1468
1469   ## Get the bind_attributes, if any exist
1470   my $bind_attributes = $self->source_bind_attributes($source);
1471
1472   ## Bind the values and execute
1473   my $placeholder_index = 1;
1474
1475   foreach my $bound (@$bind) {
1476
1477     my $attributes = {};
1478     my ($column_name, $data_index) = @$bound;
1479
1480     if( $bind_attributes ) {
1481       $attributes = $bind_attributes->{$column_name}
1482       if defined $bind_attributes->{$column_name};
1483     }
1484
1485     my @data = map { $_->[$data_index] } @$data;
1486
1487     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1488     $placeholder_index++;
1489   }
1490
1491   my $rv = eval {
1492     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1493   };
1494   my $err = $@ || $sth->errstr;
1495
1496 # Statement must finish even if there was an exception.
1497   eval { $sth->finish };
1498   $err = $@ unless $err;
1499
1500   if ($err) {
1501     my $i = 0;
1502     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1503
1504     $self->throw_exception("Unexpected populate error: $err")
1505       if ($i > $#$tuple_status);
1506
1507     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1508       ($tuple_status->[$i][1] || $err),
1509       Data::Dumper::Concise::Dumper({
1510         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1511       }),
1512     );
1513   }
1514
1515   $guard->commit if $guard;
1516
1517   return $rv;
1518 }
1519
1520 sub _dbh_execute_array {
1521     my ($self, $sth, $tuple_status, @extra) = @_;
1522
1523     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1524 }
1525
1526 sub _dbh_execute_inserts_with_no_binds {
1527   my ($self, $sth, $count) = @_;
1528
1529   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1530
1531   eval {
1532     my $dbh = $self->_get_dbh;
1533     local $dbh->{RaiseError} = 1;
1534     local $dbh->{PrintError} = 0;
1535
1536     $sth->execute foreach 1..$count;
1537   };
1538   my $exception = $@;
1539
1540 # Make sure statement is finished even if there was an exception.
1541   eval { $sth->finish };
1542   $exception = $@ unless $exception;
1543
1544   $self->throw_exception($exception) if $exception;
1545
1546   $guard->commit if $guard;
1547
1548   return $count;
1549 }
1550
1551 sub update {
1552   my ($self, $source, $data, $where, @args) = @_; 
1553
1554   my $bind_attrs = $self->source_bind_attributes($source);
1555   $where = $self->_strip_cond_qualifiers ($where);
1556
1557   return $self->_execute('update' => [], $source, $bind_attrs, $data, $where, @args);
1558 }
1559
1560
1561 sub delete {
1562   my ($self, $source, $where, @args) = @_;
1563
1564   my $bind_attrs = $self->source_bind_attributes($source);
1565   $where = $self->_strip_cond_qualifiers ($where);
1566
1567   return $self->_execute('delete' => [], $source, $bind_attrs, $where, @args);
1568 }
1569
1570 sub _strip_cond_qualifiers {
1571   my ($self, $where) = @_;
1572
1573   my $sqlmaker = $self->sql_maker;
1574   my ($sql, @bind) = $sqlmaker->_recurse_where($where);
1575   return undef unless $sql;
1576
1577   my ($qquot, $qsep) = map { quotemeta $_ } ( ($sqlmaker->quote_char||''), ($sqlmaker->name_sep||'.') );
1578   $sql =~ s/ (?: $qquot [\w\-]+ $qquot | [\w\-]+ ) $qsep //gx;
1579
1580   return \[$sql, @bind];
1581 }
1582
1583 # We were sent here because the $rs contains a complex search
1584 # which will require a subquery to select the correct rows
1585 # (i.e. joined or limited resultsets)
1586 #
1587 # Genarating a single PK column subquery is trivial and supported
1588 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1589 # Look at _multipk_update_delete()
1590 sub _subq_update_delete {
1591   my $self = shift;
1592   my ($rs, $op, $values) = @_;
1593
1594   my $rsrc = $rs->result_source;
1595
1596   # we already check this, but double check naively just in case. Should be removed soon
1597   my $sel = $rs->_resolved_attrs->{select};
1598   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1599   my @pcols = $rsrc->primary_columns;
1600   if (@$sel != @pcols) {
1601     $self->throw_exception (
1602       'Subquery update/delete can not be called on resultsets selecting a'
1603      .' number of columns different than the number of primary keys'
1604     );
1605   }
1606
1607   if (@pcols == 1) {
1608     return $self->$op (
1609       $rsrc,
1610       $op eq 'update' ? $values : (),
1611       { $pcols[0] => { -in => $rs->as_query } },
1612     );
1613   }
1614
1615   else {
1616     return $self->_multipk_update_delete (@_);
1617   }
1618 }
1619
1620 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1621 # resultset update/delete involving subqueries. So by default resort
1622 # to simple (and inefficient) delete_all style per-row opearations,
1623 # while allowing specific storages to override this with a faster
1624 # implementation.
1625 #
1626 sub _multipk_update_delete {
1627   return shift->_per_row_update_delete (@_);
1628 }
1629
1630 # This is the default loop used to delete/update rows for multi PK
1631 # resultsets, and used by mysql exclusively (because it can't do anything
1632 # else).
1633 #
1634 # We do not use $row->$op style queries, because resultset update/delete
1635 # is not expected to cascade (this is what delete_all/update_all is for).
1636 #
1637 # There should be no race conditions as the entire operation is rolled
1638 # in a transaction.
1639 #
1640 sub _per_row_update_delete {
1641   my $self = shift;
1642   my ($rs, $op, $values) = @_;
1643
1644   my $rsrc = $rs->result_source;
1645   my @pcols = $rsrc->primary_columns;
1646
1647   my $guard = $self->txn_scope_guard;
1648
1649   # emulate the return value of $sth->execute for non-selects
1650   my $row_cnt = '0E0';
1651
1652   my $subrs_cur = $rs->cursor;
1653   while (my @pks = $subrs_cur->next) {
1654
1655     my $cond;
1656     for my $i (0.. $#pcols) {
1657       $cond->{$pcols[$i]} = $pks[$i];
1658     }
1659
1660     $self->$op (
1661       $rsrc,
1662       $op eq 'update' ? $values : (),
1663       $cond,
1664     );
1665
1666     $row_cnt++;
1667   }
1668
1669   $guard->commit;
1670
1671   return $row_cnt;
1672 }
1673
1674 sub _select {
1675   my $self = shift;
1676
1677   # localization is neccessary as
1678   # 1) there is no infrastructure to pass this around before SQLA2
1679   # 2) _select_args sets it and _prep_for_execute consumes it
1680   my $sql_maker = $self->sql_maker;
1681   local $sql_maker->{_dbic_rs_attrs};
1682
1683   return $self->_execute($self->_select_args(@_));
1684 }
1685
1686 sub _select_args_to_query {
1687   my $self = shift;
1688
1689   # localization is neccessary as
1690   # 1) there is no infrastructure to pass this around before SQLA2
1691   # 2) _select_args sets it and _prep_for_execute consumes it
1692   my $sql_maker = $self->sql_maker;
1693   local $sql_maker->{_dbic_rs_attrs};
1694
1695   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1696   #  = $self->_select_args($ident, $select, $cond, $attrs);
1697   my ($op, $bind, $ident, $bind_attrs, @args) =
1698     $self->_select_args(@_);
1699
1700   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1701   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1702   $prepared_bind ||= [];
1703
1704   return wantarray
1705     ? ($sql, $prepared_bind, $bind_attrs)
1706     : \[ "($sql)", @$prepared_bind ]
1707   ;
1708 }
1709
1710 sub _select_args {
1711   my ($self, $ident, $select, $where, $attrs) = @_;
1712
1713   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1714
1715   my $sql_maker = $self->sql_maker;
1716   $sql_maker->{_dbic_rs_attrs} = {
1717     %$attrs,
1718     select => $select,
1719     from => $ident,
1720     where => $where,
1721     $rs_alias
1722       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1723       : ()
1724     ,
1725   };
1726
1727   # calculate bind_attrs before possible $ident mangling
1728   my $bind_attrs = {};
1729   for my $alias (keys %$alias2source) {
1730     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1731     for my $col (keys %$bindtypes) {
1732
1733       my $fqcn = join ('.', $alias, $col);
1734       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1735
1736       # Unqialified column names are nice, but at the same time can be
1737       # rather ambiguous. What we do here is basically go along with
1738       # the loop, adding an unqualified column slot to $bind_attrs,
1739       # alongside the fully qualified name. As soon as we encounter
1740       # another column by that name (which would imply another table)
1741       # we unset the unqualified slot and never add any info to it
1742       # to avoid erroneous type binding. If this happens the users
1743       # only choice will be to fully qualify his column name
1744
1745       if (exists $bind_attrs->{$col}) {
1746         $bind_attrs->{$col} = {};
1747       }
1748       else {
1749         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1750       }
1751     }
1752   }
1753
1754   # adjust limits
1755   if (
1756     $attrs->{software_limit}
1757       ||
1758     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1759   ) {
1760     $attrs->{software_limit} = 1;
1761   }
1762   else {
1763     $self->throw_exception("rows attribute must be positive if present")
1764       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1765
1766     # MySQL actually recommends this approach.  I cringe.
1767     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1768   }
1769
1770   my @limit;
1771
1772   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1773   # otherwise delegate the limiting to the storage, unless software limit was requested
1774   if (
1775     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1776        ||
1777     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1778       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1779   ) {
1780     ($ident, $select, $where, $attrs)
1781       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1782   }
1783   elsif (! $attrs->{software_limit} ) {
1784     push @limit, $attrs->{rows}, $attrs->{offset};
1785   }
1786
1787 ###
1788   # This would be the point to deflate anything found in $where
1789   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1790   # expect a row object. And all we have is a resultsource (it is trivial
1791   # to extract deflator coderefs via $alias2source above).
1792   #
1793   # I don't see a way forward other than changing the way deflators are
1794   # invoked, and that's just bad...
1795 ###
1796
1797   my $order = { map
1798     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1799     (qw/order_by group_by having/ )
1800   };
1801
1802   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1803 }
1804
1805 #
1806 # This is the code producing joined subqueries like:
1807 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1808 #
1809 sub _adjust_select_args_for_complex_prefetch {
1810   my ($self, $from, $select, $where, $attrs) = @_;
1811
1812   $self->throw_exception ('Nothing to prefetch... how did we get here?!')
1813     if not @{$attrs->{_prefetch_select}};
1814
1815   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1816     if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
1817
1818
1819   # generate inner/outer attribute lists, remove stuff that doesn't apply
1820   my $outer_attrs = { %$attrs };
1821   delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
1822
1823   my $inner_attrs = { %$attrs };
1824   delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
1825
1826
1827   # bring over all non-collapse-induced order_by into the inner query (if any)
1828   # the outer one will have to keep them all
1829   delete $inner_attrs->{order_by};
1830   if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
1831     $inner_attrs->{order_by} = [
1832       @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1833     ];
1834   }
1835
1836
1837   # generate the inner/outer select lists
1838   # for inside we consider only stuff *not* brought in by the prefetch
1839   # on the outside we substitute any function for its alias
1840   my $outer_select = [ @$select ];
1841   my $inner_select = [];
1842   for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
1843     my $sel = $outer_select->[$i];
1844
1845     if (ref $sel eq 'HASH' ) {
1846       $sel->{-as} ||= $attrs->{as}[$i];
1847       $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
1848     }
1849
1850     push @$inner_select, $sel;
1851   }
1852
1853   # normalize a copy of $from, so it will be easier to work with further
1854   # down (i.e. promote the initial hashref to an AoH)
1855   $from = [ @$from ];
1856   $from->[0] = [ $from->[0] ];
1857   my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1858
1859
1860   # decide which parts of the join will remain in either part of
1861   # the outer/inner query
1862
1863   # First we compose a list of which aliases are used in restrictions
1864   # (i.e. conditions/order/grouping/etc). Since we do not have
1865   # introspectable SQLA, we fall back to ugly scanning of raw SQL for
1866   # WHERE, and for pieces of ORDER BY in order to determine which aliases
1867   # need to appear in the resulting sql.
1868   # It may not be very efficient, but it's a reasonable stop-gap
1869   # Also unqualified column names will not be considered, but more often
1870   # than not this is actually ok
1871   #
1872   # In the same loop we enumerate part of the selection aliases, as
1873   # it requires the same sqla hack for the time being
1874   my ($restrict_aliases, $select_aliases, $prefetch_aliases);
1875   {
1876     # produce stuff unquoted, so it can be scanned
1877     my $sql_maker = $self->sql_maker;
1878     local $sql_maker->{quote_char};
1879     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1880     $sep = "\Q$sep\E";
1881
1882     my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
1883     my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
1884     my $where_sql = $sql_maker->where ($where);
1885     my $group_by_sql = $sql_maker->_order_by({
1886       map { $_ => $inner_attrs->{$_} } qw/group_by having/
1887     });
1888     my @non_prefetch_order_by_chunks = (map
1889       { ref $_ ? $_->[0] : $_ }
1890       $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
1891     );
1892
1893
1894     for my $alias (keys %original_join_info) {
1895       my $seen_re = qr/\b $alias $sep/x;
1896
1897       for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
1898         if ($piece =~ $seen_re) {
1899           $restrict_aliases->{$alias} = 1;
1900         }
1901       }
1902
1903       if ($non_prefetch_select_sql =~ $seen_re) {
1904           $select_aliases->{$alias} = 1;
1905       }
1906
1907       if ($prefetch_select_sql =~ $seen_re) {
1908           $prefetch_aliases->{$alias} = 1;
1909       }
1910
1911     }
1912   }
1913
1914   # Add any non-left joins to the restriction list (such joins are indeed restrictions)
1915   for my $j (values %original_join_info) {
1916     my $alias = $j->{-alias} or next;
1917     $restrict_aliases->{$alias} = 1 if (
1918       (not $j->{-join_type})
1919         or
1920       ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
1921     );
1922   }
1923
1924   # mark all join parents as mentioned
1925   # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
1926   for my $collection ($restrict_aliases, $select_aliases) {
1927     for my $alias (keys %$collection) {
1928       $collection->{$_} = 1
1929         for (@{ $original_join_info{$alias}{-join_path} || [] });
1930     }
1931   }
1932
1933   # construct the inner $from for the subquery
1934   my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
1935   my @inner_from;
1936   for my $j (@$from) {
1937     push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
1938   }
1939
1940   # if a multi-type join was needed in the subquery ("multi" is indicated by
1941   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1942   unless ($inner_attrs->{group_by}) {
1943     for my $alias (keys %inner_joins) {
1944
1945       # the dot comes from some weirdness in collapse
1946       # remove after the rewrite
1947       if ($attrs->{collapse}{".$alias"}) {
1948         $inner_attrs->{group_by} ||= $inner_select;
1949         last;
1950       }
1951     }
1952   }
1953
1954   # demote the inner_from head
1955   $inner_from[0] = $inner_from[0][0];
1956
1957   # generate the subquery
1958   my $subq = $self->_select_args_to_query (
1959     \@inner_from,
1960     $inner_select,
1961     $where,
1962     $inner_attrs,
1963   );
1964
1965   my $subq_joinspec = {
1966     -alias => $attrs->{alias},
1967     -source_handle => $inner_from[0]{-source_handle},
1968     $attrs->{alias} => $subq,
1969   };
1970
1971   # Generate the outer from - this is relatively easy (really just replace
1972   # the join slot with the subquery), with a major caveat - we can not
1973   # join anything that is non-selecting (not part of the prefetch), but at
1974   # the same time is a multi-type relationship, as it will explode the result.
1975   #
1976   # There are two possibilities here
1977   # - either the join is non-restricting, in which case we simply throw it away
1978   # - it is part of the restrictions, in which case we need to collapse the outer
1979   #   result by tackling yet another group_by to the outside of the query
1980
1981   # so first generate the outer_from, up to the substitution point
1982   my @outer_from;
1983   while (my $j = shift @$from) {
1984     if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
1985       push @outer_from, [
1986         $subq_joinspec,
1987         @{$j}[1 .. $#$j],
1988       ];
1989       last; # we'll take care of what's left in $from below
1990     }
1991     else {
1992       push @outer_from, $j;
1993     }
1994   }
1995
1996   # see what's left - throw away if not selecting/restricting
1997   # also throw in a group_by if restricting to guard against
1998   # cross-join explosions
1999   #
2000   while (my $j = shift @$from) {
2001     my $alias = $j->[0]{-alias};
2002
2003     if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
2004       push @outer_from, $j;
2005     }
2006     elsif ($restrict_aliases->{$alias}) {
2007       push @outer_from, $j;
2008
2009       # FIXME - this should be obviated by SQLA2, as I'll be able to 
2010       # have restrict_inner and restrict_outer... or something to that
2011       # effect... I think...
2012
2013       # FIXME2 - I can't find a clean way to determine if a particular join
2014       # is a multi - instead I am just treating everything as a potential
2015       # explosive join (ribasushi)
2016       #
2017       # if (my $handle = $j->[0]{-source_handle}) {
2018       #   my $rsrc = $handle->resolve;
2019       #   ... need to bail out of the following if this is not a multi,
2020       #       as it will be much easier on the db ...
2021
2022           $outer_attrs->{group_by} ||= $outer_select;
2023       # }
2024     }
2025   }
2026
2027   # demote the outer_from head
2028   $outer_from[0] = $outer_from[0][0];
2029
2030   # This is totally horrific - the $where ends up in both the inner and outer query
2031   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
2032   # then if where conditions apply to the *right* side of the prefetch, you may have
2033   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
2034   # the outer select to exclude joins you didin't want in the first place
2035   #
2036   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
2037   return (\@outer_from, $outer_select, $where, $outer_attrs);
2038 }
2039
2040 sub _resolve_ident_sources {
2041   my ($self, $ident) = @_;
2042
2043   my $alias2source = {};
2044   my $rs_alias;
2045
2046   # the reason this is so contrived is that $ident may be a {from}
2047   # structure, specifying multiple tables to join
2048   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
2049     # this is compat mode for insert/update/delete which do not deal with aliases
2050     $alias2source->{me} = $ident;
2051     $rs_alias = 'me';
2052   }
2053   elsif (ref $ident eq 'ARRAY') {
2054
2055     for (@$ident) {
2056       my $tabinfo;
2057       if (ref $_ eq 'HASH') {
2058         $tabinfo = $_;
2059         $rs_alias = $tabinfo->{-alias};
2060       }
2061       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
2062         $tabinfo = $_->[0];
2063       }
2064
2065       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
2066         if ($tabinfo->{-source_handle});
2067     }
2068   }
2069
2070   return ($alias2source, $rs_alias);
2071 }
2072
2073 # Takes $ident, \@column_names
2074 #
2075 # returns { $column_name => \%column_info, ... }
2076 # also note: this adds -result_source => $rsrc to the column info
2077 #
2078 # usage:
2079 #   my $col_sources = $self->_resolve_column_info($ident, @column_names);
2080 sub _resolve_column_info {
2081   my ($self, $ident, $colnames) = @_;
2082   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
2083
2084   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
2085   $sep = "\Q$sep\E";
2086
2087   my (%return, %seen_cols);
2088
2089   # compile a global list of column names, to be able to properly
2090   # disambiguate unqualified column names (if at all possible)
2091   for my $alias (keys %$alias2src) {
2092     my $rsrc = $alias2src->{$alias};
2093     for my $colname ($rsrc->columns) {
2094       push @{$seen_cols{$colname}}, $alias;
2095     }
2096   }
2097
2098   COLUMN:
2099   foreach my $col (@$colnames) {
2100     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
2101
2102     unless ($alias) {
2103       # see if the column was seen exactly once (so we know which rsrc it came from)
2104       if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
2105         $alias = $seen_cols{$colname}[0];
2106       }
2107       else {
2108         next COLUMN;
2109       }
2110     }
2111
2112     my $rsrc = $alias2src->{$alias};
2113     $return{$col} = $rsrc && {
2114       %{$rsrc->column_info($colname)},
2115       -result_source => $rsrc,
2116       -source_alias => $alias,
2117     };
2118   }
2119
2120   return \%return;
2121 }
2122
2123 # Returns a counting SELECT for a simple count
2124 # query. Abstracted so that a storage could override
2125 # this to { count => 'firstcol' } or whatever makes
2126 # sense as a performance optimization
2127 sub _count_select {
2128   #my ($self, $source, $rs_attrs) = @_;
2129   return { count => '*' };
2130 }
2131
2132 # Returns a SELECT which will end up in the subselect
2133 # There may or may not be a group_by, as the subquery
2134 # might have been called to accomodate a limit
2135 #
2136 # Most databases would be happy with whatever ends up
2137 # here, but some choke in various ways.
2138 #
2139 sub _subq_count_select {
2140   my ($self, $source, $rs_attrs) = @_;
2141   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
2142
2143   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
2144   return @pcols ? \@pcols : [ 1 ];
2145 }
2146
2147 sub source_bind_attributes {
2148   my ($self, $source) = @_;
2149
2150   my $bind_attributes;
2151   foreach my $column ($source->columns) {
2152
2153     my $data_type = $source->column_info($column)->{data_type} || '';
2154     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2155      if $data_type;
2156   }
2157
2158   return $bind_attributes;
2159 }
2160
2161 =head2 select
2162
2163 =over 4
2164
2165 =item Arguments: $ident, $select, $condition, $attrs
2166
2167 =back
2168
2169 Handle a SQL select statement.
2170
2171 =cut
2172
2173 sub select {
2174   my $self = shift;
2175   my ($ident, $select, $condition, $attrs) = @_;
2176   return $self->cursor_class->new($self, \@_, $attrs);
2177 }
2178
2179 sub select_single {
2180   my $self = shift;
2181   my ($rv, $sth, @bind) = $self->_select(@_);
2182   my @row = $sth->fetchrow_array;
2183   my @nextrow = $sth->fetchrow_array if @row;
2184   if(@row && @nextrow) {
2185     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2186   }
2187   # Need to call finish() to work round broken DBDs
2188   $sth->finish();
2189   return @row;
2190 }
2191
2192 =head2 sth
2193
2194 =over 4
2195
2196 =item Arguments: $sql
2197
2198 =back
2199
2200 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2201
2202 =cut
2203
2204 sub _dbh_sth {
2205   my ($self, $dbh, $sql) = @_;
2206
2207   # 3 is the if_active parameter which avoids active sth re-use
2208   my $sth = $self->disable_sth_caching
2209     ? $dbh->prepare($sql)
2210     : $dbh->prepare_cached($sql, {}, 3);
2211
2212   # XXX You would think RaiseError would make this impossible,
2213   #  but apparently that's not true :(
2214   $self->throw_exception($dbh->errstr) if !$sth;
2215
2216   $sth;
2217 }
2218
2219 sub sth {
2220   my ($self, $sql) = @_;
2221   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2222 }
2223
2224 sub _dbh_columns_info_for {
2225   my ($self, $dbh, $table) = @_;
2226
2227   if ($dbh->can('column_info')) {
2228     my %result;
2229     eval {
2230       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2231       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2232       $sth->execute();
2233       while ( my $info = $sth->fetchrow_hashref() ){
2234         my %column_info;
2235         $column_info{data_type}   = $info->{TYPE_NAME};
2236         $column_info{size}      = $info->{COLUMN_SIZE};
2237         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2238         $column_info{default_value} = $info->{COLUMN_DEF};
2239         my $col_name = $info->{COLUMN_NAME};
2240         $col_name =~ s/^\"(.*)\"$/$1/;
2241
2242         $result{$col_name} = \%column_info;
2243       }
2244     };
2245     return \%result if !$@ && scalar keys %result;
2246   }
2247
2248   my %result;
2249   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2250   $sth->execute;
2251   my @columns = @{$sth->{NAME_lc}};
2252   for my $i ( 0 .. $#columns ){
2253     my %column_info;
2254     $column_info{data_type} = $sth->{TYPE}->[$i];
2255     $column_info{size} = $sth->{PRECISION}->[$i];
2256     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2257
2258     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2259       $column_info{data_type} = $1;
2260       $column_info{size}    = $2;
2261     }
2262
2263     $result{$columns[$i]} = \%column_info;
2264   }
2265   $sth->finish;
2266
2267   foreach my $col (keys %result) {
2268     my $colinfo = $result{$col};
2269     my $type_num = $colinfo->{data_type};
2270     my $type_name;
2271     if(defined $type_num && $dbh->can('type_info')) {
2272       my $type_info = $dbh->type_info($type_num);
2273       $type_name = $type_info->{TYPE_NAME} if $type_info;
2274       $colinfo->{data_type} = $type_name if $type_name;
2275     }
2276   }
2277
2278   return \%result;
2279 }
2280
2281 sub columns_info_for {
2282   my ($self, $table) = @_;
2283   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2284 }
2285
2286 =head2 last_insert_id
2287
2288 Return the row id of the last insert.
2289
2290 =cut
2291
2292 sub _dbh_last_insert_id {
2293     # All Storage's need to register their own _dbh_last_insert_id
2294     # the old SQLite-based method was highly inappropriate
2295
2296     my $self = shift;
2297     my $class = ref $self;
2298     $self->throw_exception (<<EOE);
2299
2300 No _dbh_last_insert_id() method found in $class.
2301 Since the method of obtaining the autoincrement id of the last insert
2302 operation varies greatly between different databases, this method must be
2303 individually implemented for every storage class.
2304 EOE
2305 }
2306
2307 sub last_insert_id {
2308   my $self = shift;
2309   $self->_dbh_last_insert_id ($self->_dbh, @_);
2310 }
2311
2312 =head2 _native_data_type
2313
2314 =over 4
2315
2316 =item Arguments: $type_name
2317
2318 =back
2319
2320 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2321 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2322 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
2323
2324 The default implementation returns C<undef>, implement in your Storage driver if
2325 you need this functionality.
2326
2327 Should map types from other databases to the native RDBMS type, for example
2328 C<VARCHAR2> to C<VARCHAR>.
2329
2330 Types with modifiers should map to the underlying data type. For example,
2331 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2332
2333 Composite types should map to the container type, for example
2334 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2335
2336 =cut
2337
2338 sub _native_data_type {
2339   #my ($self, $data_type) = @_;
2340   return undef
2341 }
2342
2343 # Check if placeholders are supported at all
2344 sub _placeholders_supported {
2345   my $self = shift;
2346   my $dbh  = $self->_get_dbh;
2347
2348   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2349   # but it is inaccurate more often than not
2350   eval {
2351     local $dbh->{PrintError} = 0;
2352     local $dbh->{RaiseError} = 1;
2353     $dbh->do('select ?', {}, 1);
2354   };
2355   return $@ ? 0 : 1;
2356 }
2357
2358 # Check if placeholders bound to non-string types throw exceptions
2359 #
2360 sub _typeless_placeholders_supported {
2361   my $self = shift;
2362   my $dbh  = $self->_get_dbh;
2363
2364   eval {
2365     local $dbh->{PrintError} = 0;
2366     local $dbh->{RaiseError} = 1;
2367     # this specifically tests a bind that is NOT a string
2368     $dbh->do('select 1 where 1 = ?', {}, 1);
2369   };
2370   return $@ ? 0 : 1;
2371 }
2372
2373 =head2 sqlt_type
2374
2375 Returns the database driver name.
2376
2377 =cut
2378
2379 sub sqlt_type {
2380   shift->_get_dbh->{Driver}->{Name};
2381 }
2382
2383 =head2 bind_attribute_by_data_type
2384
2385 Given a datatype from column info, returns a database specific bind
2386 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2387 let the database planner just handle it.
2388
2389 Generally only needed for special case column types, like bytea in postgres.
2390
2391 =cut
2392
2393 sub bind_attribute_by_data_type {
2394     return;
2395 }
2396
2397 =head2 is_datatype_numeric
2398
2399 Given a datatype from column_info, returns a boolean value indicating if
2400 the current RDBMS considers it a numeric value. This controls how
2401 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2402 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2403 be performed instead of the usual C<eq>.
2404
2405 =cut
2406
2407 sub is_datatype_numeric {
2408   my ($self, $dt) = @_;
2409
2410   return 0 unless $dt;
2411
2412   return $dt =~ /^ (?:
2413     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2414   ) $/ix;
2415 }
2416
2417
2418 =head2 create_ddl_dir (EXPERIMENTAL)
2419
2420 =over 4
2421
2422 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2423
2424 =back
2425
2426 Creates a SQL file based on the Schema, for each of the specified
2427 database engines in C<\@databases> in the given directory.
2428 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2429
2430 Given a previous version number, this will also create a file containing
2431 the ALTER TABLE statements to transform the previous schema into the
2432 current one. Note that these statements may contain C<DROP TABLE> or
2433 C<DROP COLUMN> statements that can potentially destroy data.
2434
2435 The file names are created using the C<ddl_filename> method below, please
2436 override this method in your schema if you would like a different file
2437 name format. For the ALTER file, the same format is used, replacing
2438 $version in the name with "$preversion-$version".
2439
2440 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2441 The most common value for this would be C<< { add_drop_table => 1 } >>
2442 to have the SQL produced include a C<DROP TABLE> statement for each table
2443 created. For quoting purposes supply C<quote_table_names> and
2444 C<quote_field_names>.
2445
2446 If no arguments are passed, then the following default values are assumed:
2447
2448 =over 4
2449
2450 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2451
2452 =item version    - $schema->schema_version
2453
2454 =item directory  - './'
2455
2456 =item preversion - <none>
2457
2458 =back
2459
2460 By default, C<\%sqlt_args> will have
2461
2462  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2463
2464 merged with the hash passed in. To disable any of those features, pass in a
2465 hashref like the following
2466
2467  { ignore_constraint_names => 0, # ... other options }
2468
2469
2470 Note that this feature is currently EXPERIMENTAL and may not work correctly
2471 across all databases, or fully handle complex relationships.
2472
2473 WARNING: Please check all SQL files created, before applying them.
2474
2475 =cut
2476
2477 sub create_ddl_dir {
2478   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2479
2480   if(!$dir || !-d $dir) {
2481     carp "No directory given, using ./\n";
2482     $dir = "./";
2483   }
2484   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2485   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2486
2487   my $schema_version = $schema->schema_version || '1.x';
2488   $version ||= $schema_version;
2489
2490   $sqltargs = {
2491     add_drop_table => 1,
2492     ignore_constraint_names => 1,
2493     ignore_index_names => 1,
2494     %{$sqltargs || {}}
2495   };
2496
2497   $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
2498     if !$self->_sqlt_version_ok;
2499
2500   my $sqlt = SQL::Translator->new( $sqltargs );
2501
2502   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2503   my $sqlt_schema = $sqlt->translate({ data => $schema })
2504     or $self->throw_exception ($sqlt->error);
2505
2506   foreach my $db (@$databases) {
2507     $sqlt->reset();
2508     $sqlt->{schema} = $sqlt_schema;
2509     $sqlt->producer($db);
2510
2511     my $file;
2512     my $filename = $schema->ddl_filename($db, $version, $dir);
2513     if (-e $filename && ($version eq $schema_version )) {
2514       # if we are dumping the current version, overwrite the DDL
2515       carp "Overwriting existing DDL file - $filename";
2516       unlink($filename);
2517     }
2518
2519     my $output = $sqlt->translate;
2520     if(!$output) {
2521       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2522       next;
2523     }
2524     if(!open($file, ">$filename")) {
2525       $self->throw_exception("Can't open $filename for writing ($!)");
2526       next;
2527     }
2528     print $file $output;
2529     close($file);
2530
2531     next unless ($preversion);
2532
2533     require SQL::Translator::Diff;
2534
2535     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2536     if(!-e $prefilename) {
2537       carp("No previous schema file found ($prefilename)");
2538       next;
2539     }
2540
2541     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2542     if(-e $difffile) {
2543       carp("Overwriting existing diff file - $difffile");
2544       unlink($difffile);
2545     }
2546
2547     my $source_schema;
2548     {
2549       my $t = SQL::Translator->new($sqltargs);
2550       $t->debug( 0 );
2551       $t->trace( 0 );
2552
2553       $t->parser( $db )
2554         or $self->throw_exception ($t->error);
2555
2556       my $out = $t->translate( $prefilename )
2557         or $self->throw_exception ($t->error);
2558
2559       $source_schema = $t->schema;
2560
2561       $source_schema->name( $prefilename )
2562         unless ( $source_schema->name );
2563     }
2564
2565     # The "new" style of producers have sane normalization and can support
2566     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2567     # And we have to diff parsed SQL against parsed SQL.
2568     my $dest_schema = $sqlt_schema;
2569
2570     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2571       my $t = SQL::Translator->new($sqltargs);
2572       $t->debug( 0 );
2573       $t->trace( 0 );
2574
2575       $t->parser( $db )
2576         or $self->throw_exception ($t->error);
2577
2578       my $out = $t->translate( $filename )
2579         or $self->throw_exception ($t->error);
2580
2581       $dest_schema = $t->schema;
2582
2583       $dest_schema->name( $filename )
2584         unless $dest_schema->name;
2585     }
2586
2587     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2588                                                   $dest_schema,   $db,
2589                                                   $sqltargs
2590                                                  );
2591     if(!open $file, ">$difffile") {
2592       $self->throw_exception("Can't write to $difffile ($!)");
2593       next;
2594     }
2595     print $file $diff;
2596     close($file);
2597   }
2598 }
2599
2600 =head2 deployment_statements
2601
2602 =over 4
2603
2604 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2605
2606 =back
2607
2608 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2609
2610 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2611 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2612
2613 C<$directory> is used to return statements from files in a previously created
2614 L</create_ddl_dir> directory and is optional. The filenames are constructed
2615 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2616
2617 If no C<$directory> is specified then the statements are constructed on the
2618 fly using L<SQL::Translator> and C<$version> is ignored.
2619
2620 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2621
2622 =cut
2623
2624 sub deployment_statements {
2625   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2626   $type ||= $self->sqlt_type;
2627   $version ||= $schema->schema_version || '1.x';
2628   $dir ||= './';
2629   my $filename = $schema->ddl_filename($type, $version, $dir);
2630   if(-f $filename)
2631   {
2632       my $file;
2633       open($file, "<$filename")
2634         or $self->throw_exception("Can't open $filename ($!)");
2635       my @rows = <$file>;
2636       close($file);
2637       return join('', @rows);
2638   }
2639
2640   $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
2641     if !$self->_sqlt_version_ok;
2642
2643   # sources needs to be a parser arg, but for simplicty allow at top level
2644   # coming in
2645   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2646       if exists $sqltargs->{sources};
2647
2648   my $tr = SQL::Translator->new(
2649     producer => "SQL::Translator::Producer::${type}",
2650     %$sqltargs,
2651     parser => 'SQL::Translator::Parser::DBIx::Class',
2652     data => $schema,
2653   );
2654
2655   my $ret = $tr->translate
2656     or $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error);
2657
2658   return $ret;
2659 }
2660
2661 sub deploy {
2662   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2663   my $deploy = sub {
2664     my $line = shift;
2665     return if($line =~ /^--/);
2666     return if(!$line);
2667     # next if($line =~ /^DROP/m);
2668     return if($line =~ /^BEGIN TRANSACTION/m);
2669     return if($line =~ /^COMMIT/m);
2670     return if $line =~ /^\s+$/; # skip whitespace only
2671     $self->_query_start($line);
2672     eval {
2673       # do a dbh_do cycle here, as we need some error checking in
2674       # place (even though we will ignore errors)
2675       $self->dbh_do (sub { $_[1]->do($line) });
2676     };
2677     if ($@) {
2678       carp qq{$@ (running "${line}")};
2679     }
2680     $self->_query_end($line);
2681   };
2682   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2683   if (@statements > 1) {
2684     foreach my $statement (@statements) {
2685       $deploy->( $statement );
2686     }
2687   }
2688   elsif (@statements == 1) {
2689     foreach my $line ( split(";\n", $statements[0])) {
2690       $deploy->( $line );
2691     }
2692   }
2693 }
2694
2695 =head2 datetime_parser
2696
2697 Returns the datetime parser class
2698
2699 =cut
2700
2701 sub datetime_parser {
2702   my $self = shift;
2703   return $self->{datetime_parser} ||= do {
2704     $self->build_datetime_parser(@_);
2705   };
2706 }
2707
2708 =head2 datetime_parser_type
2709
2710 Defines (returns) the datetime parser class - currently hardwired to
2711 L<DateTime::Format::MySQL>
2712
2713 =cut
2714
2715 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2716
2717 =head2 build_datetime_parser
2718
2719 See L</datetime_parser>
2720
2721 =cut
2722
2723 sub build_datetime_parser {
2724   my $self = shift;
2725   my $type = $self->datetime_parser_type(@_);
2726   $self->ensure_class_loaded ($type);
2727   return $type;
2728 }
2729
2730
2731 =head2 is_replicating
2732
2733 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2734 replicate from a master database.  Default is undef, which is the result
2735 returned by databases that don't support replication.
2736
2737 =cut
2738
2739 sub is_replicating {
2740     return;
2741
2742 }
2743
2744 =head2 lag_behind_master
2745
2746 Returns a number that represents a certain amount of lag behind a master db
2747 when a given storage is replicating.  The number is database dependent, but
2748 starts at zero and increases with the amount of lag. Default in undef
2749
2750 =cut
2751
2752 sub lag_behind_master {
2753     return;
2754 }
2755
2756 # SQLT version handling
2757 {
2758   my $_sqlt_version_ok;     # private
2759   my $_sqlt_version_error;  # private
2760
2761   sub _sqlt_version_ok {
2762     if (!defined $_sqlt_version_ok) {
2763       eval "use SQL::Translator $minimum_sqlt_version";
2764       if ($@) {
2765         $_sqlt_version_ok = 0;
2766         $_sqlt_version_error = $@;
2767       }
2768       else {
2769         $_sqlt_version_ok = 1;
2770       }
2771     }
2772     return $_sqlt_version_ok;
2773   }
2774
2775   sub _sqlt_version_error {
2776     shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
2777     return $_sqlt_version_error;
2778   }
2779
2780   sub _sqlt_minimum_version { $minimum_sqlt_version };
2781 }
2782
2783 sub DESTROY {
2784   my $self = shift;
2785
2786   $self->_verify_pid if $self->_dbh;
2787
2788   # some databases need this to stop spewing warnings
2789   if (my $dbh = $self->_dbh) {
2790     local $@;
2791     eval { $dbh->disconnect };
2792   }
2793
2794   $self->_dbh(undef);
2795 }
2796
2797 1;
2798
2799 =head1 USAGE NOTES
2800
2801 =head2 DBIx::Class and AutoCommit
2802
2803 DBIx::Class can do some wonderful magic with handling exceptions,
2804 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2805 (the default) combined with C<txn_do> for transaction support.
2806
2807 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2808 in an assumed transaction between commits, and you're telling us you'd
2809 like to manage that manually.  A lot of the magic protections offered by
2810 this module will go away.  We can't protect you from exceptions due to database
2811 disconnects because we don't know anything about how to restart your
2812 transactions.  You're on your own for handling all sorts of exceptional
2813 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2814 be with raw DBI.
2815
2816
2817 =head1 AUTHORS
2818
2819 Matt S. Trout <mst@shadowcatsystems.co.uk>
2820
2821 Andy Grundman <andy@hybridized.org>
2822
2823 =head1 LICENSE
2824
2825 You may distribute this code under the same terms as Perl itself.
2826
2827 =cut