86b7acae9cec7de5c338a9d94183ba458a519d92
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17
18 # what version of sqlt do we require if deploy() without a ddl_dir is invoked
19 # when changing also adjust the corresponding author_require in Makefile.PL
20 my $minimum_sqlt_version = '0.11002';
21
22
23 __PACKAGE__->mk_group_accessors('simple' =>
24   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
25      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
26 );
27
28 # the values for these accessors are picked out (and deleted) from
29 # the attribute hashref passed to connect_info
30 my @storage_options = qw/
31   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
32   disable_sth_caching unsafe auto_savepoint
33 /;
34 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
35
36
37 # default cursor class, overridable in connect_info attributes
38 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
39
40 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
41 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
42
43
44 =head1 NAME
45
46 DBIx::Class::Storage::DBI - DBI storage handler
47
48 =head1 SYNOPSIS
49
50   my $schema = MySchema->connect('dbi:SQLite:my.db');
51
52   $schema->storage->debug(1);
53
54   my @stuff = $schema->storage->dbh_do(
55     sub {
56       my ($storage, $dbh, @args) = @_;
57       $dbh->do("DROP TABLE authors");
58     },
59     @column_list
60   );
61
62   $schema->resultset('Book')->search({
63      written_on => $schema->storage->datetime_parser(DateTime->now)
64   });
65
66 =head1 DESCRIPTION
67
68 This class represents the connection to an RDBMS via L<DBI>.  See
69 L<DBIx::Class::Storage> for general information.  This pod only
70 documents DBI-specific methods and behaviors.
71
72 =head1 METHODS
73
74 =cut
75
76 sub new {
77   my $new = shift->next::method(@_);
78
79   $new->transaction_depth(0);
80   $new->_sql_maker_opts({});
81   $new->{savepoints} = [];
82   $new->{_in_dbh_do} = 0;
83   $new->{_dbh_gen} = 0;
84
85   $new;
86 }
87
88 =head2 connect_info
89
90 This method is normally called by L<DBIx::Class::Schema/connection>, which
91 encapsulates its argument list in an arrayref before passing them here.
92
93 The argument list may contain:
94
95 =over
96
97 =item *
98
99 The same 4-element argument set one would normally pass to
100 L<DBI/connect>, optionally followed by
101 L<extra attributes|/DBIx::Class specific connection attributes>
102 recognized by DBIx::Class:
103
104   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
105
106 =item *
107
108 A single code reference which returns a connected
109 L<DBI database handle|DBI/connect> optionally followed by
110 L<extra attributes|/DBIx::Class specific connection attributes> recognized
111 by DBIx::Class:
112
113   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
114
115 =item *
116
117 A single hashref with all the attributes and the dsn/user/password
118 mixed together:
119
120   $connect_info_args = [{
121     dsn => $dsn,
122     user => $user,
123     password => $pass,
124     %dbi_attributes,
125     %extra_attributes,
126   }];
127
128   $connect_info_args = [{
129     dbh_maker => sub { DBI->connect (...) },
130     %dbi_attributes,
131     %extra_attributes,
132   }];
133
134 This is particularly useful for L<Catalyst> based applications, allowing the
135 following config (L<Config::General> style):
136
137   <Model::DB>
138     schema_class   App::DB
139     <connect_info>
140       dsn          dbi:mysql:database=test
141       user         testuser
142       password     TestPass
143       AutoCommit   1
144     </connect_info>
145   </Model::DB>
146
147 The C<dsn>/C<user>/C<password> combination can be substituted by the
148 C<dbh_maker> key whose value is a coderef that returns a connected
149 L<DBI database handle|DBI/connect>
150
151 =back
152
153 Please note that the L<DBI> docs recommend that you always explicitly
154 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
155 recommends that it be set to I<1>, and that you perform transactions
156 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
157 to I<1> if you do not do explicitly set it to zero.  This is the default
158 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
159
160 =head3 DBIx::Class specific connection attributes
161
162 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
163 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
164 the following connection options. These options can be mixed in with your other
165 L<DBI> connection attributes, or placed in a seperate hashref
166 (C<\%extra_attributes>) as shown above.
167
168 Every time C<connect_info> is invoked, any previous settings for
169 these options will be cleared before setting the new ones, regardless of
170 whether any options are specified in the new C<connect_info>.
171
172
173 =over
174
175 =item on_connect_do
176
177 Specifies things to do immediately after connecting or re-connecting to
178 the database.  Its value may contain:
179
180 =over
181
182 =item a scalar
183
184 This contains one SQL statement to execute.
185
186 =item an array reference
187
188 This contains SQL statements to execute in order.  Each element contains
189 a string or a code reference that returns a string.
190
191 =item a code reference
192
193 This contains some code to execute.  Unlike code references within an
194 array reference, its return value is ignored.
195
196 =back
197
198 =item on_disconnect_do
199
200 Takes arguments in the same form as L</on_connect_do> and executes them
201 immediately before disconnecting from the database.
202
203 Note, this only runs if you explicitly call L</disconnect> on the
204 storage object.
205
206 =item on_connect_call
207
208 A more generalized form of L</on_connect_do> that calls the specified
209 C<connect_call_METHOD> methods in your storage driver.
210
211   on_connect_do => 'select 1'
212
213 is equivalent to:
214
215   on_connect_call => [ [ do_sql => 'select 1' ] ]
216
217 Its values may contain:
218
219 =over
220
221 =item a scalar
222
223 Will call the C<connect_call_METHOD> method.
224
225 =item a code reference
226
227 Will execute C<< $code->($storage) >>
228
229 =item an array reference
230
231 Each value can be a method name or code reference.
232
233 =item an array of arrays
234
235 For each array, the first item is taken to be the C<connect_call_> method name
236 or code reference, and the rest are parameters to it.
237
238 =back
239
240 Some predefined storage methods you may use:
241
242 =over
243
244 =item do_sql
245
246 Executes a SQL string or a code reference that returns a SQL string. This is
247 what L</on_connect_do> and L</on_disconnect_do> use.
248
249 It can take:
250
251 =over
252
253 =item a scalar
254
255 Will execute the scalar as SQL.
256
257 =item an arrayref
258
259 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
260 attributes hashref and bind values.
261
262 =item a code reference
263
264 Will execute C<< $code->($storage) >> and execute the return array refs as
265 above.
266
267 =back
268
269 =item datetime_setup
270
271 Execute any statements necessary to initialize the database session to return
272 and accept datetime/timestamp values used with
273 L<DBIx::Class::InflateColumn::DateTime>.
274
275 Only necessary for some databases, see your specific storage driver for
276 implementation details.
277
278 =back
279
280 =item on_disconnect_call
281
282 Takes arguments in the same form as L</on_connect_call> and executes them
283 immediately before disconnecting from the database.
284
285 Calls the C<disconnect_call_METHOD> methods as opposed to the
286 C<connect_call_METHOD> methods called by L</on_connect_call>.
287
288 Note, this only runs if you explicitly call L</disconnect> on the
289 storage object.
290
291 =item disable_sth_caching
292
293 If set to a true value, this option will disable the caching of
294 statement handles via L<DBI/prepare_cached>.
295
296 =item limit_dialect
297
298 Sets the limit dialect. This is useful for JDBC-bridge among others
299 where the remote SQL-dialect cannot be determined by the name of the
300 driver alone. See also L<SQL::Abstract::Limit>.
301
302 =item quote_char
303
304 Specifies what characters to use to quote table and column names. If
305 you use this you will want to specify L</name_sep> as well.
306
307 C<quote_char> expects either a single character, in which case is it
308 is placed on either side of the table/column name, or an arrayref of length
309 2 in which case the table/column name is placed between the elements.
310
311 For example under MySQL you should use C<< quote_char => '`' >>, and for
312 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
313
314 =item name_sep
315
316 This only needs to be used in conjunction with C<quote_char>, and is used to
317 specify the charecter that seperates elements (schemas, tables, columns) from
318 each other. In most cases this is simply a C<.>.
319
320 The consequences of not supplying this value is that L<SQL::Abstract>
321 will assume DBIx::Class' uses of aliases to be complete column
322 names. The output will look like I<"me.name"> when it should actually
323 be I<"me"."name">.
324
325 =item unsafe
326
327 This Storage driver normally installs its own C<HandleError>, sets
328 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
329 all database handles, including those supplied by a coderef.  It does this
330 so that it can have consistent and useful error behavior.
331
332 If you set this option to a true value, Storage will not do its usual
333 modifications to the database handle's attributes, and instead relies on
334 the settings in your connect_info DBI options (or the values you set in
335 your connection coderef, in the case that you are connecting via coderef).
336
337 Note that your custom settings can cause Storage to malfunction,
338 especially if you set a C<HandleError> handler that suppresses exceptions
339 and/or disable C<RaiseError>.
340
341 =item auto_savepoint
342
343 If this option is true, L<DBIx::Class> will use savepoints when nesting
344 transactions, making it possible to recover from failure in the inner
345 transaction without having to abort all outer transactions.
346
347 =item cursor_class
348
349 Use this argument to supply a cursor class other than the default
350 L<DBIx::Class::Storage::DBI::Cursor>.
351
352 =back
353
354 Some real-life examples of arguments to L</connect_info> and
355 L<DBIx::Class::Schema/connect>
356
357   # Simple SQLite connection
358   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
359
360   # Connect via subref
361   ->connect_info([ sub { DBI->connect(...) } ]);
362
363   # Connect via subref in hashref
364   ->connect_info([{
365     dbh_maker => sub { DBI->connect(...) },
366     on_connect_do => 'alter session ...',
367   }]);
368
369   # A bit more complicated
370   ->connect_info(
371     [
372       'dbi:Pg:dbname=foo',
373       'postgres',
374       'my_pg_password',
375       { AutoCommit => 1 },
376       { quote_char => q{"}, name_sep => q{.} },
377     ]
378   );
379
380   # Equivalent to the previous example
381   ->connect_info(
382     [
383       'dbi:Pg:dbname=foo',
384       'postgres',
385       'my_pg_password',
386       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
387     ]
388   );
389
390   # Same, but with hashref as argument
391   # See parse_connect_info for explanation
392   ->connect_info(
393     [{
394       dsn         => 'dbi:Pg:dbname=foo',
395       user        => 'postgres',
396       password    => 'my_pg_password',
397       AutoCommit  => 1,
398       quote_char  => q{"},
399       name_sep    => q{.},
400     }]
401   );
402
403   # Subref + DBIx::Class-specific connection options
404   ->connect_info(
405     [
406       sub { DBI->connect(...) },
407       {
408           quote_char => q{`},
409           name_sep => q{@},
410           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
411           disable_sth_caching => 1,
412       },
413     ]
414   );
415
416
417
418 =cut
419
420 sub connect_info {
421   my ($self, $info_arg) = @_;
422
423   return $self->_connect_info if !$info_arg;
424
425   my @args = @$info_arg;  # take a shallow copy for further mutilation
426   $self->_connect_info([@args]); # copy for _connect_info
427
428
429   # combine/pre-parse arguments depending on invocation style
430
431   my %attrs;
432   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
433     %attrs = %{ $args[1] || {} };
434     @args = $args[0];
435   }
436   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
437     %attrs = %{$args[0]};
438     @args = ();
439     if (my $code = delete $attrs{dbh_maker}) {
440       @args = $code;
441
442       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
443       if (@ignored) {
444         carp sprintf (
445             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
446           . "to the result of 'dbh_maker'",
447
448           join (', ', map { "'$_'" } (@ignored) ),
449         );
450       }
451     }
452     else {
453       @args = delete @attrs{qw/dsn user password/};
454     }
455   }
456   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
457     %attrs = (
458       % { $args[3] || {} },
459       % { $args[4] || {} },
460     );
461     @args = @args[0,1,2];
462   }
463
464   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
465   #  the new set of options
466   $self->_sql_maker(undef);
467   $self->_sql_maker_opts({});
468
469   if(keys %attrs) {
470     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
471       if(my $value = delete $attrs{$storage_opt}) {
472         $self->$storage_opt($value);
473       }
474     }
475     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
476       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
477         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
478       }
479     }
480   }
481
482   if (ref $args[0] eq 'CODE') {
483     # _connect() never looks past $args[0] in this case
484     %attrs = ()
485   } else {
486     %attrs = (
487       %{ $self->_default_dbi_connect_attributes || {} },
488       %attrs,
489     );
490   }
491
492   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
493   $self->_connect_info;
494 }
495
496 sub _default_dbi_connect_attributes {
497   return {
498     AutoCommit => 1,
499     RaiseError => 1,
500     PrintError => 0,
501   };
502 }
503
504 =head2 on_connect_do
505
506 This method is deprecated in favour of setting via L</connect_info>.
507
508 =cut
509
510 =head2 on_disconnect_do
511
512 This method is deprecated in favour of setting via L</connect_info>.
513
514 =cut
515
516 sub _parse_connect_do {
517   my ($self, $type) = @_;
518
519   my $val = $self->$type;
520   return () if not defined $val;
521
522   my @res;
523
524   if (not ref($val)) {
525     push @res, [ 'do_sql', $val ];
526   } elsif (ref($val) eq 'CODE') {
527     push @res, $val;
528   } elsif (ref($val) eq 'ARRAY') {
529     push @res, map { [ 'do_sql', $_ ] } @$val;
530   } else {
531     $self->throw_exception("Invalid type for $type: ".ref($val));
532   }
533
534   return \@res;
535 }
536
537 =head2 dbh_do
538
539 Arguments: ($subref | $method_name), @extra_coderef_args?
540
541 Execute the given $subref or $method_name using the new exception-based
542 connection management.
543
544 The first two arguments will be the storage object that C<dbh_do> was called
545 on and a database handle to use.  Any additional arguments will be passed
546 verbatim to the called subref as arguments 2 and onwards.
547
548 Using this (instead of $self->_dbh or $self->dbh) ensures correct
549 exception handling and reconnection (or failover in future subclasses).
550
551 Your subref should have no side-effects outside of the database, as
552 there is the potential for your subref to be partially double-executed
553 if the database connection was stale/dysfunctional.
554
555 Example:
556
557   my @stuff = $schema->storage->dbh_do(
558     sub {
559       my ($storage, $dbh, @cols) = @_;
560       my $cols = join(q{, }, @cols);
561       $dbh->selectrow_array("SELECT $cols FROM foo");
562     },
563     @column_list
564   );
565
566 =cut
567
568 sub dbh_do {
569   my $self = shift;
570   my $code = shift;
571
572   my $dbh = $self->_get_dbh;
573
574   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
575       || $self->{transaction_depth};
576
577   local $self->{_in_dbh_do} = 1;
578
579   my @result;
580   my $want_array = wantarray;
581
582   eval {
583
584     if($want_array) {
585         @result = $self->$code($dbh, @_);
586     }
587     elsif(defined $want_array) {
588         $result[0] = $self->$code($dbh, @_);
589     }
590     else {
591         $self->$code($dbh, @_);
592     }
593   };
594
595   # ->connected might unset $@ - copy
596   my $exception = $@;
597   if(!$exception) { return $want_array ? @result : $result[0] }
598
599   $self->throw_exception($exception) if $self->connected;
600
601   # We were not connected - reconnect and retry, but let any
602   #  exception fall right through this time
603   carp "Retrying $code after catching disconnected exception: $exception"
604     if $ENV{DBIC_DBIRETRY_DEBUG};
605   $self->_populate_dbh;
606   $self->$code($self->_dbh, @_);
607 }
608
609 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
610 # It also informs dbh_do to bypass itself while under the direction of txn_do,
611 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
612 sub txn_do {
613   my $self = shift;
614   my $coderef = shift;
615
616   ref $coderef eq 'CODE' or $self->throw_exception
617     ('$coderef must be a CODE reference');
618
619   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
620
621   local $self->{_in_dbh_do} = 1;
622
623   my @result;
624   my $want_array = wantarray;
625
626   my $tried = 0;
627   while(1) {
628     eval {
629       $self->_get_dbh;
630
631       $self->txn_begin;
632       if($want_array) {
633           @result = $coderef->(@_);
634       }
635       elsif(defined $want_array) {
636           $result[0] = $coderef->(@_);
637       }
638       else {
639           $coderef->(@_);
640       }
641       $self->txn_commit;
642     };
643
644     # ->connected might unset $@ - copy
645     my $exception = $@;
646     if(!$exception) { return $want_array ? @result : $result[0] }
647
648     if($tried++ || $self->connected) {
649       eval { $self->txn_rollback };
650       my $rollback_exception = $@;
651       if($rollback_exception) {
652         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
653         $self->throw_exception($exception)  # propagate nested rollback
654           if $rollback_exception =~ /$exception_class/;
655
656         $self->throw_exception(
657           "Transaction aborted: ${exception}. "
658           . "Rollback failed: ${rollback_exception}"
659         );
660       }
661       $self->throw_exception($exception)
662     }
663
664     # We were not connected, and was first try - reconnect and retry
665     # via the while loop
666     carp "Retrying $coderef after catching disconnected exception: $exception"
667       if $ENV{DBIC_DBIRETRY_DEBUG};
668     $self->_populate_dbh;
669   }
670 }
671
672 =head2 disconnect
673
674 Our C<disconnect> method also performs a rollback first if the
675 database is not in C<AutoCommit> mode.
676
677 =cut
678
679 sub disconnect {
680   my ($self) = @_;
681
682   if( $self->_dbh ) {
683     my @actions;
684
685     push @actions, ( $self->on_disconnect_call || () );
686     push @actions, $self->_parse_connect_do ('on_disconnect_do');
687
688     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
689
690     $self->_dbh_rollback unless $self->_dbh_autocommit;
691
692     $self->_dbh->disconnect;
693     $self->_dbh(undef);
694     $self->{_dbh_gen}++;
695   }
696 }
697
698 =head2 with_deferred_fk_checks
699
700 =over 4
701
702 =item Arguments: C<$coderef>
703
704 =item Return Value: The return value of $coderef
705
706 =back
707
708 Storage specific method to run the code ref with FK checks deferred or
709 in MySQL's case disabled entirely.
710
711 =cut
712
713 # Storage subclasses should override this
714 sub with_deferred_fk_checks {
715   my ($self, $sub) = @_;
716
717   $sub->();
718 }
719
720 =head2 connected
721
722 =over
723
724 =item Arguments: none
725
726 =item Return Value: 1|0
727
728 =back
729
730 Verifies that the the current database handle is active and ready to execute
731 an SQL statement (i.e. the connection did not get stale, server is still
732 answering, etc.) This method is used internally by L</dbh>.
733
734 =cut
735
736 sub connected {
737   my $self = shift;
738   return 0 unless $self->_seems_connected;
739
740   #be on the safe side
741   local $self->_dbh->{RaiseError} = 1;
742
743   return $self->_ping;
744 }
745
746 sub _seems_connected {
747   my $self = shift;
748
749   my $dbh = $self->_dbh
750     or return 0;
751
752   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
753     $self->_dbh(undef);
754     $self->{_dbh_gen}++;
755     return 0;
756   }
757   else {
758     $self->_verify_pid;
759     return 0 if !$self->_dbh;
760   }
761
762   return $dbh->FETCH('Active');
763 }
764
765 sub _ping {
766   my $self = shift;
767
768   my $dbh = $self->_dbh or return 0;
769
770   return $dbh->ping;
771 }
772
773 # handle pid changes correctly
774 #  NOTE: assumes $self->_dbh is a valid $dbh
775 sub _verify_pid {
776   my ($self) = @_;
777
778   return if defined $self->_conn_pid && $self->_conn_pid == $$;
779
780   $self->_dbh->{InactiveDestroy} = 1;
781   $self->_dbh(undef);
782   $self->{_dbh_gen}++;
783
784   return;
785 }
786
787 sub ensure_connected {
788   my ($self) = @_;
789
790   unless ($self->connected) {
791     $self->_populate_dbh;
792   }
793 }
794
795 =head2 dbh
796
797 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
798 is guaranteed to be healthy by implicitly calling L</connected>, and if
799 necessary performing a reconnection before returning. Keep in mind that this
800 is very B<expensive> on some database engines. Consider using L<dbh_do>
801 instead.
802
803 =cut
804
805 sub dbh {
806   my ($self) = @_;
807
808   if (not $self->_dbh) {
809     $self->_populate_dbh;
810   } else {
811     $self->ensure_connected;
812   }
813   return $self->_dbh;
814 }
815
816 # this is the internal "get dbh or connect (don't check)" method
817 sub _get_dbh {
818   my $self = shift;
819   $self->_verify_pid if $self->_dbh;
820   $self->_populate_dbh unless $self->_dbh;
821   return $self->_dbh;
822 }
823
824 sub _sql_maker_args {
825     my ($self) = @_;
826
827     return (
828       bindtype=>'columns',
829       array_datatypes => 1,
830       limit_dialect => $self->_get_dbh,
831       %{$self->_sql_maker_opts}
832     );
833 }
834
835 sub sql_maker {
836   my ($self) = @_;
837   unless ($self->_sql_maker) {
838     my $sql_maker_class = $self->sql_maker_class;
839     $self->ensure_class_loaded ($sql_maker_class);
840     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
841   }
842   return $self->_sql_maker;
843 }
844
845 # nothing to do by default
846 sub _rebless {}
847 sub _init {}
848
849 sub _populate_dbh {
850   my ($self) = @_;
851
852   my @info = @{$self->_dbi_connect_info || []};
853   $self->_dbh(undef); # in case ->connected failed we might get sent here
854   $self->_dbh($self->_connect(@info));
855
856   $self->_conn_pid($$);
857   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
858
859   $self->_determine_driver;
860
861   # Always set the transaction depth on connect, since
862   #  there is no transaction in progress by definition
863   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
864
865   $self->_run_connection_actions unless $self->{_in_determine_driver};
866 }
867
868 sub _run_connection_actions {
869   my $self = shift;
870   my @actions;
871
872   push @actions, ( $self->on_connect_call || () );
873   push @actions, $self->_parse_connect_do ('on_connect_do');
874
875   $self->_do_connection_actions(connect_call_ => $_) for @actions;
876 }
877
878 sub _determine_driver {
879   my ($self) = @_;
880
881   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
882     my $started_connected = 0;
883     local $self->{_in_determine_driver} = 1;
884
885     if (ref($self) eq __PACKAGE__) {
886       my $driver;
887       if ($self->_dbh) { # we are connected
888         $driver = $self->_dbh->{Driver}{Name};
889         $started_connected = 1;
890       } else {
891         # if connect_info is a CODEREF, we have no choice but to connect
892         if (ref $self->_dbi_connect_info->[0] &&
893             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
894           $self->_populate_dbh;
895           $driver = $self->_dbh->{Driver}{Name};
896         }
897         else {
898           # try to use dsn to not require being connected, the driver may still
899           # force a connection in _rebless to determine version
900           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
901         }
902       }
903
904       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
905       if ($self->load_optional_class($storage_class)) {
906         mro::set_mro($storage_class, 'c3');
907         bless $self, $storage_class;
908         $self->_rebless();
909       }
910     }
911
912     $self->_driver_determined(1);
913
914     $self->_init; # run driver-specific initializations
915
916     $self->_run_connection_actions
917         if !$started_connected && defined $self->_dbh;
918   }
919 }
920
921 sub _do_connection_actions {
922   my $self          = shift;
923   my $method_prefix = shift;
924   my $call          = shift;
925
926   if (not ref($call)) {
927     my $method = $method_prefix . $call;
928     $self->$method(@_);
929   } elsif (ref($call) eq 'CODE') {
930     $self->$call(@_);
931   } elsif (ref($call) eq 'ARRAY') {
932     if (ref($call->[0]) ne 'ARRAY') {
933       $self->_do_connection_actions($method_prefix, $_) for @$call;
934     } else {
935       $self->_do_connection_actions($method_prefix, @$_) for @$call;
936     }
937   } else {
938     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
939   }
940
941   return $self;
942 }
943
944 sub connect_call_do_sql {
945   my $self = shift;
946   $self->_do_query(@_);
947 }
948
949 sub disconnect_call_do_sql {
950   my $self = shift;
951   $self->_do_query(@_);
952 }
953
954 # override in db-specific backend when necessary
955 sub connect_call_datetime_setup { 1 }
956
957 sub _do_query {
958   my ($self, $action) = @_;
959
960   if (ref $action eq 'CODE') {
961     $action = $action->($self);
962     $self->_do_query($_) foreach @$action;
963   }
964   else {
965     # Most debuggers expect ($sql, @bind), so we need to exclude
966     # the attribute hash which is the second argument to $dbh->do
967     # furthermore the bind values are usually to be presented
968     # as named arrayref pairs, so wrap those here too
969     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
970     my $sql = shift @do_args;
971     my $attrs = shift @do_args;
972     my @bind = map { [ undef, $_ ] } @do_args;
973
974     $self->_query_start($sql, @bind);
975     $self->_get_dbh->do($sql, $attrs, @do_args);
976     $self->_query_end($sql, @bind);
977   }
978
979   return $self;
980 }
981
982 sub _connect {
983   my ($self, @info) = @_;
984
985   $self->throw_exception("You failed to provide any connection info")
986     if !@info;
987
988   my ($old_connect_via, $dbh);
989
990   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
991     $old_connect_via = $DBI::connect_via;
992     $DBI::connect_via = 'connect';
993   }
994
995   eval {
996     if(ref $info[0] eq 'CODE') {
997        $dbh = &{$info[0]}
998     }
999     else {
1000        $dbh = DBI->connect(@info);
1001     }
1002
1003     if($dbh && !$self->unsafe) {
1004       my $weak_self = $self;
1005       Scalar::Util::weaken($weak_self);
1006       $dbh->{HandleError} = sub {
1007           if ($weak_self) {
1008             $weak_self->throw_exception("DBI Exception: $_[0]");
1009           }
1010           else {
1011             # the handler may be invoked by something totally out of
1012             # the scope of DBIC
1013             croak ("DBI Exception: $_[0]");
1014           }
1015       };
1016       $dbh->{ShowErrorStatement} = 1;
1017       $dbh->{RaiseError} = 1;
1018       $dbh->{PrintError} = 0;
1019     }
1020   };
1021
1022   $DBI::connect_via = $old_connect_via if $old_connect_via;
1023
1024   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1025     if !$dbh || $@;
1026
1027   $self->_dbh_autocommit($dbh->{AutoCommit});
1028
1029   $dbh;
1030 }
1031
1032 sub svp_begin {
1033   my ($self, $name) = @_;
1034
1035   $name = $self->_svp_generate_name
1036     unless defined $name;
1037
1038   $self->throw_exception ("You can't use savepoints outside a transaction")
1039     if $self->{transaction_depth} == 0;
1040
1041   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1042     unless $self->can('_svp_begin');
1043
1044   push @{ $self->{savepoints} }, $name;
1045
1046   $self->debugobj->svp_begin($name) if $self->debug;
1047
1048   return $self->_svp_begin($name);
1049 }
1050
1051 sub svp_release {
1052   my ($self, $name) = @_;
1053
1054   $self->throw_exception ("You can't use savepoints outside a transaction")
1055     if $self->{transaction_depth} == 0;
1056
1057   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1058     unless $self->can('_svp_release');
1059
1060   if (defined $name) {
1061     $self->throw_exception ("Savepoint '$name' does not exist")
1062       unless grep { $_ eq $name } @{ $self->{savepoints} };
1063
1064     # Dig through the stack until we find the one we are releasing.  This keeps
1065     # the stack up to date.
1066     my $svp;
1067
1068     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1069   } else {
1070     $name = pop @{ $self->{savepoints} };
1071   }
1072
1073   $self->debugobj->svp_release($name) if $self->debug;
1074
1075   return $self->_svp_release($name);
1076 }
1077
1078 sub svp_rollback {
1079   my ($self, $name) = @_;
1080
1081   $self->throw_exception ("You can't use savepoints outside a transaction")
1082     if $self->{transaction_depth} == 0;
1083
1084   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1085     unless $self->can('_svp_rollback');
1086
1087   if (defined $name) {
1088       # If they passed us a name, verify that it exists in the stack
1089       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1090           $self->throw_exception("Savepoint '$name' does not exist!");
1091       }
1092
1093       # Dig through the stack until we find the one we are releasing.  This keeps
1094       # the stack up to date.
1095       while(my $s = pop(@{ $self->{savepoints} })) {
1096           last if($s eq $name);
1097       }
1098       # Add the savepoint back to the stack, as a rollback doesn't remove the
1099       # named savepoint, only everything after it.
1100       push(@{ $self->{savepoints} }, $name);
1101   } else {
1102       # We'll assume they want to rollback to the last savepoint
1103       $name = $self->{savepoints}->[-1];
1104   }
1105
1106   $self->debugobj->svp_rollback($name) if $self->debug;
1107
1108   return $self->_svp_rollback($name);
1109 }
1110
1111 sub _svp_generate_name {
1112     my ($self) = @_;
1113
1114     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1115 }
1116
1117 sub txn_begin {
1118   my $self = shift;
1119   if($self->{transaction_depth} == 0) {
1120     $self->debugobj->txn_begin()
1121       if $self->debug;
1122     $self->_dbh_begin_work;
1123   }
1124   elsif ($self->auto_savepoint) {
1125     $self->svp_begin;
1126   }
1127   $self->{transaction_depth}++;
1128 }
1129
1130 sub _dbh_begin_work {
1131   my $self = shift;
1132
1133   # if the user is utilizing txn_do - good for him, otherwise we need to
1134   # ensure that the $dbh is healthy on BEGIN.
1135   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1136   # will be replaced by a failure of begin_work itself (which will be
1137   # then retried on reconnect)
1138   if ($self->{_in_dbh_do}) {
1139     $self->_dbh->begin_work;
1140   } else {
1141     $self->dbh_do(sub { $_[1]->begin_work });
1142   }
1143 }
1144
1145 sub txn_commit {
1146   my $self = shift;
1147   if ($self->{transaction_depth} == 1) {
1148     my $dbh = $self->_dbh;
1149     $self->debugobj->txn_commit()
1150       if ($self->debug);
1151     $self->_dbh_commit;
1152     $self->{transaction_depth} = 0
1153       if $self->_dbh_autocommit;
1154   }
1155   elsif($self->{transaction_depth} > 1) {
1156     $self->{transaction_depth}--;
1157     $self->svp_release
1158       if $self->auto_savepoint;
1159   }
1160 }
1161
1162 sub _dbh_commit {
1163   my $self = shift;
1164   $self->_dbh->commit;
1165 }
1166
1167 sub txn_rollback {
1168   my $self = shift;
1169   my $dbh = $self->_dbh;
1170   eval {
1171     if ($self->{transaction_depth} == 1) {
1172       $self->debugobj->txn_rollback()
1173         if ($self->debug);
1174       $self->{transaction_depth} = 0
1175         if $self->_dbh_autocommit;
1176       $self->_dbh_rollback;
1177     }
1178     elsif($self->{transaction_depth} > 1) {
1179       $self->{transaction_depth}--;
1180       if ($self->auto_savepoint) {
1181         $self->svp_rollback;
1182         $self->svp_release;
1183       }
1184     }
1185     else {
1186       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1187     }
1188   };
1189   if ($@) {
1190     my $error = $@;
1191     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1192     $error =~ /$exception_class/ and $self->throw_exception($error);
1193     # ensure that a failed rollback resets the transaction depth
1194     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1195     $self->throw_exception($error);
1196   }
1197 }
1198
1199 sub _dbh_rollback {
1200   my $self = shift;
1201   $self->_dbh->rollback;
1202 }
1203
1204 # This used to be the top-half of _execute.  It was split out to make it
1205 #  easier to override in NoBindVars without duping the rest.  It takes up
1206 #  all of _execute's args, and emits $sql, @bind.
1207 sub _prep_for_execute {
1208   my ($self, $op, $extra_bind, $ident, $args) = @_;
1209
1210   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1211     $ident = $ident->from();
1212   }
1213
1214   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1215
1216   unshift(@bind,
1217     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1218       if $extra_bind;
1219   return ($sql, \@bind);
1220 }
1221
1222
1223 sub _fix_bind_params {
1224     my ($self, @bind) = @_;
1225
1226     ### Turn @bind from something like this:
1227     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1228     ### to this:
1229     ###   ( "'1'", "'1'", "'3'" )
1230     return
1231         map {
1232             if ( defined( $_ && $_->[1] ) ) {
1233                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1234             }
1235             else { q{'NULL'}; }
1236         } @bind;
1237 }
1238
1239 sub _query_start {
1240     my ( $self, $sql, @bind ) = @_;
1241
1242     if ( $self->debug ) {
1243         @bind = $self->_fix_bind_params(@bind);
1244
1245         $self->debugobj->query_start( $sql, @bind );
1246     }
1247 }
1248
1249 sub _query_end {
1250     my ( $self, $sql, @bind ) = @_;
1251
1252     if ( $self->debug ) {
1253         @bind = $self->_fix_bind_params(@bind);
1254         $self->debugobj->query_end( $sql, @bind );
1255     }
1256 }
1257
1258 sub _dbh_execute {
1259   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1260
1261   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1262
1263   $self->_query_start( $sql, @$bind );
1264
1265   my $sth = $self->sth($sql,$op);
1266
1267   my $placeholder_index = 1;
1268
1269   foreach my $bound (@$bind) {
1270     my $attributes = {};
1271     my($column_name, @data) = @$bound;
1272
1273     if ($bind_attributes) {
1274       $attributes = $bind_attributes->{$column_name}
1275       if defined $bind_attributes->{$column_name};
1276     }
1277
1278     foreach my $data (@data) {
1279       my $ref = ref $data;
1280       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1281
1282       $sth->bind_param($placeholder_index, $data, $attributes);
1283       $placeholder_index++;
1284     }
1285   }
1286
1287   # Can this fail without throwing an exception anyways???
1288   my $rv = $sth->execute();
1289   $self->throw_exception($sth->errstr) if !$rv;
1290
1291   $self->_query_end( $sql, @$bind );
1292
1293   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1294 }
1295
1296 sub _execute {
1297     my $self = shift;
1298     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1299 }
1300
1301 sub insert {
1302   my ($self, $source, $to_insert) = @_;
1303
1304 # redispatch to insert method of storage we reblessed into, if necessary
1305   if (not $self->_driver_determined) {
1306     $self->_determine_driver;
1307     goto $self->can('insert');
1308   }
1309
1310   my $ident = $source->from;
1311   my $bind_attributes = $self->source_bind_attributes($source);
1312
1313   my $updated_cols = {};
1314
1315   foreach my $col ( $source->columns ) {
1316     if ( !defined $to_insert->{$col} ) {
1317       my $col_info = $source->column_info($col);
1318
1319       if ( $col_info->{auto_nextval} ) {
1320         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1321           'nextval',
1322           $col_info->{sequence} ||
1323             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1324         );
1325       }
1326     }
1327   }
1328
1329   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1330
1331   return $updated_cols;
1332 }
1333
1334 ## Still not quite perfect, and EXPERIMENTAL
1335 ## Currently it is assumed that all values passed will be "normal", i.e. not
1336 ## scalar refs, or at least, all the same type as the first set, the statement is
1337 ## only prepped once.
1338 sub insert_bulk {
1339   my ($self, $source, $cols, $data) = @_;
1340
1341 # redispatch to insert_bulk method of storage we reblessed into, if necessary
1342   if (not $self->_driver_determined) {
1343     $self->_determine_driver;
1344     goto $self->can('insert_bulk');
1345   }
1346
1347   my %colvalues;
1348   @colvalues{@$cols} = (0..$#$cols);
1349
1350   for my $i (0..$#$cols) {
1351     my $first_val = $data->[0][$i];
1352     next unless ref $first_val eq 'SCALAR';
1353
1354     $colvalues{ $cols->[$i] } = $first_val;
1355 ## This is probably unnecessary since $rs->populate only looks at the first
1356 ## slice anyway.
1357 #      if (grep {
1358 #        ref $_ eq 'SCALAR' && $$_ eq $$first_val
1359 #      } map $data->[$_][$i], (1..$#$data)) == (@$data - 1);
1360   }
1361
1362   # check for bad data
1363   my $bad_slice = sub {
1364     my ($msg, $col_idx, $slice_idx) = @_;
1365     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1366       $msg,
1367       $cols->[$col_idx],
1368       Data::Dumper::Concise::Dumper({
1369         map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1370       }),
1371     );
1372   };
1373
1374   for my $datum_idx (0..$#$data) {
1375     my $datum = $data->[$datum_idx];
1376
1377     for my $col_idx (0..$#$cols) {
1378       my $val            = $datum->[$col_idx];
1379       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1380       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1381
1382       if ($is_literal_sql) {
1383         if (not ref $val) {
1384           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1385         }
1386         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1387           $bad_slice->("$reftype reference found where literal SQL expected",
1388             $col_idx, $datum_idx);
1389         }
1390         elsif ($$val ne $$sqla_bind){
1391           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1392             $col_idx, $datum_idx);
1393         }
1394       }
1395       elsif (my $reftype = ref $val) {
1396         $bad_slice->("$reftype reference found where bind expected",
1397           $col_idx, $datum_idx);
1398       }
1399     }
1400   }
1401
1402   my ($sql, $bind) = $self->_prep_for_execute (
1403     'insert', undef, $source, [\%colvalues]
1404   );
1405   my @bind = @$bind;
1406
1407   my $empty_bind = 1 if (not @bind) &&
1408     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1409
1410   if ((not @bind) && (not $empty_bind)) {
1411     $self->throw_exception(
1412       'Cannot insert_bulk without support for placeholders'
1413     );
1414   }
1415
1416   $self->_query_start( $sql, ['__BULK__'] );
1417   my $sth = $self->sth($sql);
1418
1419   my $rv = do {
1420     if ($empty_bind) {
1421       # bind_param_array doesn't work if there are no binds
1422       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1423     }
1424     else {
1425 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1426       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1427     }
1428   };
1429
1430   $self->_query_end( $sql, ['__BULK__'] );
1431
1432   return (wantarray ? ($rv, $sth, @bind) : $rv);
1433 }
1434
1435 sub _execute_array {
1436   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1437
1438   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1439
1440   ## This must be an arrayref, else nothing works!
1441   my $tuple_status = [];
1442
1443   ## Get the bind_attributes, if any exist
1444   my $bind_attributes = $self->source_bind_attributes($source);
1445
1446   ## Bind the values and execute
1447   my $placeholder_index = 1;
1448
1449   foreach my $bound (@$bind) {
1450
1451     my $attributes = {};
1452     my ($column_name, $data_index) = @$bound;
1453
1454     if( $bind_attributes ) {
1455       $attributes = $bind_attributes->{$column_name}
1456       if defined $bind_attributes->{$column_name};
1457     }
1458
1459     my @data = map { $_->[$data_index] } @$data;
1460
1461     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1462     $placeholder_index++;
1463   }
1464
1465   my $rv = eval {
1466     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1467   };
1468   my $err = $@ || $sth->errstr;
1469
1470 # Statement must finish even if there was an exception.
1471   eval { $sth->finish };
1472   $err = $@ unless $err;
1473
1474   if ($err) {
1475     my $i = 0;
1476     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1477
1478     $self->throw_exception("Unexpected populate error: $err")
1479       if ($i > $#$tuple_status);
1480
1481     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1482       ($tuple_status->[$i][1] || $err),
1483       Data::Dumper::Concise::Dumper({
1484         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1485       }),
1486     );
1487   }
1488
1489   $guard->commit if $guard;
1490
1491   return $rv;
1492 }
1493
1494 sub _dbh_execute_array {
1495     my ($self, $sth, $tuple_status, @extra) = @_;
1496
1497     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1498 }
1499
1500 sub _dbh_execute_inserts_with_no_binds {
1501   my ($self, $sth, $count) = @_;
1502
1503   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1504
1505   eval {
1506     my $dbh = $self->_get_dbh;
1507     local $dbh->{RaiseError} = 1;
1508     local $dbh->{PrintError} = 0;
1509
1510     $sth->execute foreach 1..$count;
1511   };
1512   my $exception = $@;
1513
1514 # Make sure statement is finished even if there was an exception.
1515   eval { $sth->finish };
1516   $exception = $@ unless $exception;
1517
1518   $self->throw_exception($exception) if $exception;
1519
1520   $guard->commit if $guard;
1521
1522   return $count;
1523 }
1524
1525 sub update {
1526   my ($self, $source, @args) = @_; 
1527
1528 # redispatch to update method of storage we reblessed into, if necessary
1529   if (not $self->_driver_determined) {
1530     $self->_determine_driver;
1531     goto $self->can('update');
1532   }
1533
1534   my $bind_attributes = $self->source_bind_attributes($source);
1535
1536   return $self->_execute('update' => [], $source, $bind_attributes, @args);
1537 }
1538
1539
1540 sub delete {
1541   my $self = shift @_;
1542   my $source = shift @_;
1543   $self->_determine_driver;
1544   my $bind_attrs = $self->source_bind_attributes($source);
1545
1546   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1547 }
1548
1549 # We were sent here because the $rs contains a complex search
1550 # which will require a subquery to select the correct rows
1551 # (i.e. joined or limited resultsets)
1552 #
1553 # Genarating a single PK column subquery is trivial and supported
1554 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1555 # Look at _multipk_update_delete()
1556 sub _subq_update_delete {
1557   my $self = shift;
1558   my ($rs, $op, $values) = @_;
1559
1560   my $rsrc = $rs->result_source;
1561
1562   # we already check this, but double check naively just in case. Should be removed soon
1563   my $sel = $rs->_resolved_attrs->{select};
1564   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1565   my @pcols = $rsrc->primary_columns;
1566   if (@$sel != @pcols) {
1567     $self->throw_exception (
1568       'Subquery update/delete can not be called on resultsets selecting a'
1569      .' number of columns different than the number of primary keys'
1570     );
1571   }
1572
1573   if (@pcols == 1) {
1574     return $self->$op (
1575       $rsrc,
1576       $op eq 'update' ? $values : (),
1577       { $pcols[0] => { -in => $rs->as_query } },
1578     );
1579   }
1580
1581   else {
1582     return $self->_multipk_update_delete (@_);
1583   }
1584 }
1585
1586 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1587 # resultset update/delete involving subqueries. So by default resort
1588 # to simple (and inefficient) delete_all style per-row opearations,
1589 # while allowing specific storages to override this with a faster
1590 # implementation.
1591 #
1592 sub _multipk_update_delete {
1593   return shift->_per_row_update_delete (@_);
1594 }
1595
1596 # This is the default loop used to delete/update rows for multi PK
1597 # resultsets, and used by mysql exclusively (because it can't do anything
1598 # else).
1599 #
1600 # We do not use $row->$op style queries, because resultset update/delete
1601 # is not expected to cascade (this is what delete_all/update_all is for).
1602 #
1603 # There should be no race conditions as the entire operation is rolled
1604 # in a transaction.
1605 #
1606 sub _per_row_update_delete {
1607   my $self = shift;
1608   my ($rs, $op, $values) = @_;
1609
1610   my $rsrc = $rs->result_source;
1611   my @pcols = $rsrc->primary_columns;
1612
1613   my $guard = $self->txn_scope_guard;
1614
1615   # emulate the return value of $sth->execute for non-selects
1616   my $row_cnt = '0E0';
1617
1618   my $subrs_cur = $rs->cursor;
1619   while (my @pks = $subrs_cur->next) {
1620
1621     my $cond;
1622     for my $i (0.. $#pcols) {
1623       $cond->{$pcols[$i]} = $pks[$i];
1624     }
1625
1626     $self->$op (
1627       $rsrc,
1628       $op eq 'update' ? $values : (),
1629       $cond,
1630     );
1631
1632     $row_cnt++;
1633   }
1634
1635   $guard->commit;
1636
1637   return $row_cnt;
1638 }
1639
1640 sub _select {
1641   my $self = shift;
1642
1643   # localization is neccessary as
1644   # 1) there is no infrastructure to pass this around before SQLA2
1645   # 2) _select_args sets it and _prep_for_execute consumes it
1646   my $sql_maker = $self->sql_maker;
1647   local $sql_maker->{_dbic_rs_attrs};
1648
1649   return $self->_execute($self->_select_args(@_));
1650 }
1651
1652 sub _select_args_to_query {
1653   my $self = shift;
1654
1655   # localization is neccessary as
1656   # 1) there is no infrastructure to pass this around before SQLA2
1657   # 2) _select_args sets it and _prep_for_execute consumes it
1658   my $sql_maker = $self->sql_maker;
1659   local $sql_maker->{_dbic_rs_attrs};
1660
1661   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1662   #  = $self->_select_args($ident, $select, $cond, $attrs);
1663   my ($op, $bind, $ident, $bind_attrs, @args) =
1664     $self->_select_args(@_);
1665
1666   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1667   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1668   $prepared_bind ||= [];
1669
1670   return wantarray
1671     ? ($sql, $prepared_bind, $bind_attrs)
1672     : \[ "($sql)", @$prepared_bind ]
1673   ;
1674 }
1675
1676 sub _select_args {
1677   my ($self, $ident, $select, $where, $attrs) = @_;
1678
1679   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1680
1681   my $sql_maker = $self->sql_maker;
1682   $sql_maker->{_dbic_rs_attrs} = {
1683     %$attrs,
1684     select => $select,
1685     from => $ident,
1686     where => $where,
1687     $rs_alias
1688       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1689       : ()
1690     ,
1691   };
1692
1693   # calculate bind_attrs before possible $ident mangling
1694   my $bind_attrs = {};
1695   for my $alias (keys %$alias2source) {
1696     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1697     for my $col (keys %$bindtypes) {
1698
1699       my $fqcn = join ('.', $alias, $col);
1700       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1701
1702       # Unqialified column names are nice, but at the same time can be
1703       # rather ambiguous. What we do here is basically go along with
1704       # the loop, adding an unqualified column slot to $bind_attrs,
1705       # alongside the fully qualified name. As soon as we encounter
1706       # another column by that name (which would imply another table)
1707       # we unset the unqualified slot and never add any info to it
1708       # to avoid erroneous type binding. If this happens the users
1709       # only choice will be to fully qualify his column name
1710
1711       if (exists $bind_attrs->{$col}) {
1712         $bind_attrs->{$col} = {};
1713       }
1714       else {
1715         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1716       }
1717     }
1718   }
1719
1720   # adjust limits
1721   if (
1722     $attrs->{software_limit}
1723       ||
1724     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1725   ) {
1726     $attrs->{software_limit} = 1;
1727   }
1728   else {
1729     $self->throw_exception("rows attribute must be positive if present")
1730       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1731
1732     # MySQL actually recommends this approach.  I cringe.
1733     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1734   }
1735
1736   my @limit;
1737
1738   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1739   # otherwise delegate the limiting to the storage, unless software limit was requested
1740   if (
1741     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1742        ||
1743     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1744       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1745   ) {
1746     ($ident, $select, $where, $attrs)
1747       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1748   }
1749   elsif (! $attrs->{software_limit} ) {
1750     push @limit, $attrs->{rows}, $attrs->{offset};
1751   }
1752
1753 ###
1754   # This would be the point to deflate anything found in $where
1755   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1756   # expect a row object. And all we have is a resultsource (it is trivial
1757   # to extract deflator coderefs via $alias2source above).
1758   #
1759   # I don't see a way forward other than changing the way deflators are
1760   # invoked, and that's just bad...
1761 ###
1762
1763   my $order = { map
1764     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1765     (qw/order_by group_by having/ )
1766   };
1767
1768   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1769 }
1770
1771 #
1772 # This is the code producing joined subqueries like:
1773 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1774 #
1775 sub _adjust_select_args_for_complex_prefetch {
1776   my ($self, $from, $select, $where, $attrs) = @_;
1777
1778   $self->throw_exception ('Nothing to prefetch... how did we get here?!')
1779     if not @{$attrs->{_prefetch_select}};
1780
1781   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1782     if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
1783
1784
1785   # generate inner/outer attribute lists, remove stuff that doesn't apply
1786   my $outer_attrs = { %$attrs };
1787   delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
1788
1789   my $inner_attrs = { %$attrs };
1790   delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
1791
1792
1793   # bring over all non-collapse-induced order_by into the inner query (if any)
1794   # the outer one will have to keep them all
1795   delete $inner_attrs->{order_by};
1796   if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
1797     $inner_attrs->{order_by} = [
1798       @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1799     ];
1800   }
1801
1802
1803   # generate the inner/outer select lists
1804   # for inside we consider only stuff *not* brought in by the prefetch
1805   # on the outside we substitute any function for its alias
1806   my $outer_select = [ @$select ];
1807   my $inner_select = [];
1808   for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
1809     my $sel = $outer_select->[$i];
1810
1811     if (ref $sel eq 'HASH' ) {
1812       $sel->{-as} ||= $attrs->{as}[$i];
1813       $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
1814     }
1815
1816     push @$inner_select, $sel;
1817   }
1818
1819   # normalize a copy of $from, so it will be easier to work with further
1820   # down (i.e. promote the initial hashref to an AoH)
1821   $from = [ @$from ];
1822   $from->[0] = [ $from->[0] ];
1823   my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1824
1825
1826   # decide which parts of the join will remain in either part of
1827   # the outer/inner query
1828
1829   # First we compose a list of which aliases are used in restrictions
1830   # (i.e. conditions/order/grouping/etc). Since we do not have
1831   # introspectable SQLA, we fall back to ugly scanning of raw SQL for
1832   # WHERE, and for pieces of ORDER BY in order to determine which aliases
1833   # need to appear in the resulting sql.
1834   # It may not be very efficient, but it's a reasonable stop-gap
1835   # Also unqualified column names will not be considered, but more often
1836   # than not this is actually ok
1837   #
1838   # In the same loop we enumerate part of the selection aliases, as
1839   # it requires the same sqla hack for the time being
1840   my ($restrict_aliases, $select_aliases, $prefetch_aliases);
1841   {
1842     # produce stuff unquoted, so it can be scanned
1843     my $sql_maker = $self->sql_maker;
1844     local $sql_maker->{quote_char};
1845     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1846     $sep = "\Q$sep\E";
1847
1848     my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
1849     my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
1850     my $where_sql = $sql_maker->where ($where);
1851     my $group_by_sql = $sql_maker->_order_by({
1852       map { $_ => $inner_attrs->{$_} } qw/group_by having/
1853     });
1854     my @non_prefetch_order_by_chunks = (map
1855       { ref $_ ? $_->[0] : $_ }
1856       $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
1857     );
1858
1859
1860     for my $alias (keys %original_join_info) {
1861       my $seen_re = qr/\b $alias $sep/x;
1862
1863       for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
1864         if ($piece =~ $seen_re) {
1865           $restrict_aliases->{$alias} = 1;
1866         }
1867       }
1868
1869       if ($non_prefetch_select_sql =~ $seen_re) {
1870           $select_aliases->{$alias} = 1;
1871       }
1872
1873       if ($prefetch_select_sql =~ $seen_re) {
1874           $prefetch_aliases->{$alias} = 1;
1875       }
1876
1877     }
1878   }
1879
1880   # Add any non-left joins to the restriction list (such joins are indeed restrictions)
1881   for my $j (values %original_join_info) {
1882     my $alias = $j->{-alias} or next;
1883     $restrict_aliases->{$alias} = 1 if (
1884       (not $j->{-join_type})
1885         or
1886       ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
1887     );
1888   }
1889
1890   # mark all join parents as mentioned
1891   # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
1892   for my $collection ($restrict_aliases, $select_aliases) {
1893     for my $alias (keys %$collection) {
1894       $collection->{$_} = 1
1895         for (@{ $original_join_info{$alias}{-join_path} || [] });
1896     }
1897   }
1898
1899   # construct the inner $from for the subquery
1900   my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
1901   my @inner_from;
1902   for my $j (@$from) {
1903     push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
1904   }
1905
1906   # if a multi-type join was needed in the subquery ("multi" is indicated by
1907   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1908   unless ($inner_attrs->{group_by}) {
1909     for my $alias (keys %inner_joins) {
1910
1911       # the dot comes from some weirdness in collapse
1912       # remove after the rewrite
1913       if ($attrs->{collapse}{".$alias"}) {
1914         $inner_attrs->{group_by} ||= $inner_select;
1915         last;
1916       }
1917     }
1918   }
1919
1920   # demote the inner_from head
1921   $inner_from[0] = $inner_from[0][0];
1922
1923   # generate the subquery
1924   my $subq = $self->_select_args_to_query (
1925     \@inner_from,
1926     $inner_select,
1927     $where,
1928     $inner_attrs,
1929   );
1930
1931   my $subq_joinspec = {
1932     -alias => $attrs->{alias},
1933     -source_handle => $inner_from[0]{-source_handle},
1934     $attrs->{alias} => $subq,
1935   };
1936
1937   # Generate the outer from - this is relatively easy (really just replace
1938   # the join slot with the subquery), with a major caveat - we can not
1939   # join anything that is non-selecting (not part of the prefetch), but at
1940   # the same time is a multi-type relationship, as it will explode the result.
1941   #
1942   # There are two possibilities here
1943   # - either the join is non-restricting, in which case we simply throw it away
1944   # - it is part of the restrictions, in which case we need to collapse the outer
1945   #   result by tackling yet another group_by to the outside of the query
1946
1947   # so first generate the outer_from, up to the substitution point
1948   my @outer_from;
1949   while (my $j = shift @$from) {
1950     if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
1951       push @outer_from, [
1952         $subq_joinspec,
1953         @{$j}[1 .. $#$j],
1954       ];
1955       last; # we'll take care of what's left in $from below
1956     }
1957     else {
1958       push @outer_from, $j;
1959     }
1960   }
1961
1962   # see what's left - throw away if not selecting/restricting
1963   # also throw in a group_by if restricting to guard against
1964   # cross-join explosions
1965   #
1966   while (my $j = shift @$from) {
1967     my $alias = $j->[0]{-alias};
1968
1969     if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
1970       push @outer_from, $j;
1971     }
1972     elsif ($restrict_aliases->{$alias}) {
1973       push @outer_from, $j;
1974
1975       # FIXME - this should be obviated by SQLA2, as I'll be able to 
1976       # have restrict_inner and restrict_outer... or something to that
1977       # effect... I think...
1978
1979       # FIXME2 - I can't find a clean way to determine if a particular join
1980       # is a multi - instead I am just treating everything as a potential
1981       # explosive join (ribasushi)
1982       #
1983       # if (my $handle = $j->[0]{-source_handle}) {
1984       #   my $rsrc = $handle->resolve;
1985       #   ... need to bail out of the following if this is not a multi,
1986       #       as it will be much easier on the db ...
1987
1988           $outer_attrs->{group_by} ||= $outer_select;
1989       # }
1990     }
1991   }
1992
1993   # demote the outer_from head
1994   $outer_from[0] = $outer_from[0][0];
1995
1996   # This is totally horrific - the $where ends up in both the inner and outer query
1997   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
1998   # then if where conditions apply to the *right* side of the prefetch, you may have
1999   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
2000   # the outer select to exclude joins you didin't want in the first place
2001   #
2002   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
2003   return (\@outer_from, $outer_select, $where, $outer_attrs);
2004 }
2005
2006 sub _resolve_ident_sources {
2007   my ($self, $ident) = @_;
2008
2009   my $alias2source = {};
2010   my $rs_alias;
2011
2012   # the reason this is so contrived is that $ident may be a {from}
2013   # structure, specifying multiple tables to join
2014   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
2015     # this is compat mode for insert/update/delete which do not deal with aliases
2016     $alias2source->{me} = $ident;
2017     $rs_alias = 'me';
2018   }
2019   elsif (ref $ident eq 'ARRAY') {
2020
2021     for (@$ident) {
2022       my $tabinfo;
2023       if (ref $_ eq 'HASH') {
2024         $tabinfo = $_;
2025         $rs_alias = $tabinfo->{-alias};
2026       }
2027       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
2028         $tabinfo = $_->[0];
2029       }
2030
2031       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
2032         if ($tabinfo->{-source_handle});
2033     }
2034   }
2035
2036   return ($alias2source, $rs_alias);
2037 }
2038
2039 # Takes $ident, \@column_names
2040 #
2041 # returns { $column_name => \%column_info, ... }
2042 # also note: this adds -result_source => $rsrc to the column info
2043 #
2044 # usage:
2045 #   my $col_sources = $self->_resolve_column_info($ident, @column_names);
2046 sub _resolve_column_info {
2047   my ($self, $ident, $colnames) = @_;
2048   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
2049
2050   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
2051   $sep = "\Q$sep\E";
2052
2053   my (%return, %seen_cols);
2054
2055   # compile a global list of column names, to be able to properly
2056   # disambiguate unqualified column names (if at all possible)
2057   for my $alias (keys %$alias2src) {
2058     my $rsrc = $alias2src->{$alias};
2059     for my $colname ($rsrc->columns) {
2060       push @{$seen_cols{$colname}}, $alias;
2061     }
2062   }
2063
2064   COLUMN:
2065   foreach my $col (@$colnames) {
2066     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
2067
2068     unless ($alias) {
2069       # see if the column was seen exactly once (so we know which rsrc it came from)
2070       if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
2071         $alias = $seen_cols{$colname}[0];
2072       }
2073       else {
2074         next COLUMN;
2075       }
2076     }
2077
2078     my $rsrc = $alias2src->{$alias};
2079     $return{$col} = $rsrc && {
2080       %{$rsrc->column_info($colname)},
2081       -result_source => $rsrc,
2082       -source_alias => $alias,
2083     };
2084   }
2085
2086   return \%return;
2087 }
2088
2089 # Returns a counting SELECT for a simple count
2090 # query. Abstracted so that a storage could override
2091 # this to { count => 'firstcol' } or whatever makes
2092 # sense as a performance optimization
2093 sub _count_select {
2094   #my ($self, $source, $rs_attrs) = @_;
2095   return { count => '*' };
2096 }
2097
2098 # Returns a SELECT which will end up in the subselect
2099 # There may or may not be a group_by, as the subquery
2100 # might have been called to accomodate a limit
2101 #
2102 # Most databases would be happy with whatever ends up
2103 # here, but some choke in various ways.
2104 #
2105 sub _subq_count_select {
2106   my ($self, $source, $rs_attrs) = @_;
2107   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
2108
2109   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
2110   return @pcols ? \@pcols : [ 1 ];
2111 }
2112
2113 sub source_bind_attributes {
2114   my ($self, $source) = @_;
2115
2116   my $bind_attributes;
2117   foreach my $column ($source->columns) {
2118
2119     my $data_type = $source->column_info($column)->{data_type} || '';
2120     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2121      if $data_type;
2122   }
2123
2124   return $bind_attributes;
2125 }
2126
2127 =head2 select
2128
2129 =over 4
2130
2131 =item Arguments: $ident, $select, $condition, $attrs
2132
2133 =back
2134
2135 Handle a SQL select statement.
2136
2137 =cut
2138
2139 sub select {
2140   my $self = shift;
2141   my ($ident, $select, $condition, $attrs) = @_;
2142   return $self->cursor_class->new($self, \@_, $attrs);
2143 }
2144
2145 sub select_single {
2146   my $self = shift;
2147   my ($rv, $sth, @bind) = $self->_select(@_);
2148   my @row = $sth->fetchrow_array;
2149   my @nextrow = $sth->fetchrow_array if @row;
2150   if(@row && @nextrow) {
2151     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2152   }
2153   # Need to call finish() to work round broken DBDs
2154   $sth->finish();
2155   return @row;
2156 }
2157
2158 =head2 sth
2159
2160 =over 4
2161
2162 =item Arguments: $sql
2163
2164 =back
2165
2166 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2167
2168 =cut
2169
2170 sub _dbh_sth {
2171   my ($self, $dbh, $sql) = @_;
2172
2173   # 3 is the if_active parameter which avoids active sth re-use
2174   my $sth = $self->disable_sth_caching
2175     ? $dbh->prepare($sql)
2176     : $dbh->prepare_cached($sql, {}, 3);
2177
2178   # XXX You would think RaiseError would make this impossible,
2179   #  but apparently that's not true :(
2180   $self->throw_exception($dbh->errstr) if !$sth;
2181
2182   $sth;
2183 }
2184
2185 sub sth {
2186   my ($self, $sql) = @_;
2187   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2188 }
2189
2190 sub _dbh_columns_info_for {
2191   my ($self, $dbh, $table) = @_;
2192
2193   if ($dbh->can('column_info')) {
2194     my %result;
2195     eval {
2196       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2197       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2198       $sth->execute();
2199       while ( my $info = $sth->fetchrow_hashref() ){
2200         my %column_info;
2201         $column_info{data_type}   = $info->{TYPE_NAME};
2202         $column_info{size}      = $info->{COLUMN_SIZE};
2203         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2204         $column_info{default_value} = $info->{COLUMN_DEF};
2205         my $col_name = $info->{COLUMN_NAME};
2206         $col_name =~ s/^\"(.*)\"$/$1/;
2207
2208         $result{$col_name} = \%column_info;
2209       }
2210     };
2211     return \%result if !$@ && scalar keys %result;
2212   }
2213
2214   my %result;
2215   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2216   $sth->execute;
2217   my @columns = @{$sth->{NAME_lc}};
2218   for my $i ( 0 .. $#columns ){
2219     my %column_info;
2220     $column_info{data_type} = $sth->{TYPE}->[$i];
2221     $column_info{size} = $sth->{PRECISION}->[$i];
2222     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2223
2224     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2225       $column_info{data_type} = $1;
2226       $column_info{size}    = $2;
2227     }
2228
2229     $result{$columns[$i]} = \%column_info;
2230   }
2231   $sth->finish;
2232
2233   foreach my $col (keys %result) {
2234     my $colinfo = $result{$col};
2235     my $type_num = $colinfo->{data_type};
2236     my $type_name;
2237     if(defined $type_num && $dbh->can('type_info')) {
2238       my $type_info = $dbh->type_info($type_num);
2239       $type_name = $type_info->{TYPE_NAME} if $type_info;
2240       $colinfo->{data_type} = $type_name if $type_name;
2241     }
2242   }
2243
2244   return \%result;
2245 }
2246
2247 sub columns_info_for {
2248   my ($self, $table) = @_;
2249   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2250 }
2251
2252 =head2 last_insert_id
2253
2254 Return the row id of the last insert.
2255
2256 =cut
2257
2258 sub _dbh_last_insert_id {
2259     # All Storage's need to register their own _dbh_last_insert_id
2260     # the old SQLite-based method was highly inappropriate
2261
2262     my $self = shift;
2263     my $class = ref $self;
2264     $self->throw_exception (<<EOE);
2265
2266 No _dbh_last_insert_id() method found in $class.
2267 Since the method of obtaining the autoincrement id of the last insert
2268 operation varies greatly between different databases, this method must be
2269 individually implemented for every storage class.
2270 EOE
2271 }
2272
2273 sub last_insert_id {
2274   my $self = shift;
2275   $self->_dbh_last_insert_id ($self->_dbh, @_);
2276 }
2277
2278 =head2 _native_data_type
2279
2280 =over 4
2281
2282 =item Arguments: $type_name
2283
2284 =back
2285
2286 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2287 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2288 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
2289
2290 The default implementation returns C<undef>, implement in your Storage driver if
2291 you need this functionality.
2292
2293 Should map types from other databases to the native RDBMS type, for example
2294 C<VARCHAR2> to C<VARCHAR>.
2295
2296 Types with modifiers should map to the underlying data type. For example,
2297 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2298
2299 Composite types should map to the container type, for example
2300 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2301
2302 =cut
2303
2304 sub _native_data_type {
2305   #my ($self, $data_type) = @_;
2306   return undef
2307 }
2308
2309 # Check if placeholders are supported at all
2310 sub _placeholders_supported {
2311   my $self = shift;
2312   my $dbh  = $self->_get_dbh;
2313
2314   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2315   # but it is inaccurate more often than not
2316   eval {
2317     local $dbh->{PrintError} = 0;
2318     local $dbh->{RaiseError} = 1;
2319     $dbh->do('select ?', {}, 1);
2320   };
2321   return $@ ? 0 : 1;
2322 }
2323
2324 # Check if placeholders bound to non-string types throw exceptions
2325 #
2326 sub _typeless_placeholders_supported {
2327   my $self = shift;
2328   my $dbh  = $self->_get_dbh;
2329
2330   eval {
2331     local $dbh->{PrintError} = 0;
2332     local $dbh->{RaiseError} = 1;
2333     # this specifically tests a bind that is NOT a string
2334     $dbh->do('select 1 where 1 = ?', {}, 1);
2335   };
2336   return $@ ? 0 : 1;
2337 }
2338
2339 =head2 sqlt_type
2340
2341 Returns the database driver name.
2342
2343 =cut
2344
2345 sub sqlt_type {
2346   my ($self) = @_;
2347
2348   if (not $self->_driver_determined) {
2349     $self->_determine_driver;
2350     goto $self->can ('sqlt_type');
2351   }
2352
2353   $self->_get_dbh->{Driver}->{Name};
2354 }
2355
2356 =head2 bind_attribute_by_data_type
2357
2358 Given a datatype from column info, returns a database specific bind
2359 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2360 let the database planner just handle it.
2361
2362 Generally only needed for special case column types, like bytea in postgres.
2363
2364 =cut
2365
2366 sub bind_attribute_by_data_type {
2367     return;
2368 }
2369
2370 =head2 is_datatype_numeric
2371
2372 Given a datatype from column_info, returns a boolean value indicating if
2373 the current RDBMS considers it a numeric value. This controls how
2374 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2375 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2376 be performed instead of the usual C<eq>.
2377
2378 =cut
2379
2380 sub is_datatype_numeric {
2381   my ($self, $dt) = @_;
2382
2383   return 0 unless $dt;
2384
2385   return $dt =~ /^ (?:
2386     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2387   ) $/ix;
2388 }
2389
2390
2391 =head2 create_ddl_dir (EXPERIMENTAL)
2392
2393 =over 4
2394
2395 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2396
2397 =back
2398
2399 Creates a SQL file based on the Schema, for each of the specified
2400 database engines in C<\@databases> in the given directory.
2401 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2402
2403 Given a previous version number, this will also create a file containing
2404 the ALTER TABLE statements to transform the previous schema into the
2405 current one. Note that these statements may contain C<DROP TABLE> or
2406 C<DROP COLUMN> statements that can potentially destroy data.
2407
2408 The file names are created using the C<ddl_filename> method below, please
2409 override this method in your schema if you would like a different file
2410 name format. For the ALTER file, the same format is used, replacing
2411 $version in the name with "$preversion-$version".
2412
2413 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2414 The most common value for this would be C<< { add_drop_table => 1 } >>
2415 to have the SQL produced include a C<DROP TABLE> statement for each table
2416 created. For quoting purposes supply C<quote_table_names> and
2417 C<quote_field_names>.
2418
2419 If no arguments are passed, then the following default values are assumed:
2420
2421 =over 4
2422
2423 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2424
2425 =item version    - $schema->schema_version
2426
2427 =item directory  - './'
2428
2429 =item preversion - <none>
2430
2431 =back
2432
2433 By default, C<\%sqlt_args> will have
2434
2435  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2436
2437 merged with the hash passed in. To disable any of those features, pass in a
2438 hashref like the following
2439
2440  { ignore_constraint_names => 0, # ... other options }
2441
2442
2443 Note that this feature is currently EXPERIMENTAL and may not work correctly
2444 across all databases, or fully handle complex relationships.
2445
2446 WARNING: Please check all SQL files created, before applying them.
2447
2448 =cut
2449
2450 sub create_ddl_dir {
2451   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2452
2453   if(!$dir || !-d $dir) {
2454     carp "No directory given, using ./\n";
2455     $dir = "./";
2456   }
2457   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2458   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2459
2460   my $schema_version = $schema->schema_version || '1.x';
2461   $version ||= $schema_version;
2462
2463   $sqltargs = {
2464     add_drop_table => 1,
2465     ignore_constraint_names => 1,
2466     ignore_index_names => 1,
2467     %{$sqltargs || {}}
2468   };
2469
2470   $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
2471     if !$self->_sqlt_version_ok;
2472
2473   my $sqlt = SQL::Translator->new( $sqltargs );
2474
2475   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2476   my $sqlt_schema = $sqlt->translate({ data => $schema })
2477     or $self->throw_exception ($sqlt->error);
2478
2479   foreach my $db (@$databases) {
2480     $sqlt->reset();
2481     $sqlt->{schema} = $sqlt_schema;
2482     $sqlt->producer($db);
2483
2484     my $file;
2485     my $filename = $schema->ddl_filename($db, $version, $dir);
2486     if (-e $filename && ($version eq $schema_version )) {
2487       # if we are dumping the current version, overwrite the DDL
2488       carp "Overwriting existing DDL file - $filename";
2489       unlink($filename);
2490     }
2491
2492     my $output = $sqlt->translate;
2493     if(!$output) {
2494       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2495       next;
2496     }
2497     if(!open($file, ">$filename")) {
2498       $self->throw_exception("Can't open $filename for writing ($!)");
2499       next;
2500     }
2501     print $file $output;
2502     close($file);
2503
2504     next unless ($preversion);
2505
2506     require SQL::Translator::Diff;
2507
2508     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2509     if(!-e $prefilename) {
2510       carp("No previous schema file found ($prefilename)");
2511       next;
2512     }
2513
2514     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2515     if(-e $difffile) {
2516       carp("Overwriting existing diff file - $difffile");
2517       unlink($difffile);
2518     }
2519
2520     my $source_schema;
2521     {
2522       my $t = SQL::Translator->new($sqltargs);
2523       $t->debug( 0 );
2524       $t->trace( 0 );
2525
2526       $t->parser( $db )
2527         or $self->throw_exception ($t->error);
2528
2529       my $out = $t->translate( $prefilename )
2530         or $self->throw_exception ($t->error);
2531
2532       $source_schema = $t->schema;
2533
2534       $source_schema->name( $prefilename )
2535         unless ( $source_schema->name );
2536     }
2537
2538     # The "new" style of producers have sane normalization and can support
2539     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2540     # And we have to diff parsed SQL against parsed SQL.
2541     my $dest_schema = $sqlt_schema;
2542
2543     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2544       my $t = SQL::Translator->new($sqltargs);
2545       $t->debug( 0 );
2546       $t->trace( 0 );
2547
2548       $t->parser( $db )
2549         or $self->throw_exception ($t->error);
2550
2551       my $out = $t->translate( $filename )
2552         or $self->throw_exception ($t->error);
2553
2554       $dest_schema = $t->schema;
2555
2556       $dest_schema->name( $filename )
2557         unless $dest_schema->name;
2558     }
2559
2560     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2561                                                   $dest_schema,   $db,
2562                                                   $sqltargs
2563                                                  );
2564     if(!open $file, ">$difffile") {
2565       $self->throw_exception("Can't write to $difffile ($!)");
2566       next;
2567     }
2568     print $file $diff;
2569     close($file);
2570   }
2571 }
2572
2573 =head2 deployment_statements
2574
2575 =over 4
2576
2577 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2578
2579 =back
2580
2581 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2582
2583 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2584 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2585
2586 C<$directory> is used to return statements from files in a previously created
2587 L</create_ddl_dir> directory and is optional. The filenames are constructed
2588 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2589
2590 If no C<$directory> is specified then the statements are constructed on the
2591 fly using L<SQL::Translator> and C<$version> is ignored.
2592
2593 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2594
2595 =cut
2596
2597 sub deployment_statements {
2598   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2599   $type ||= $self->sqlt_type;
2600   $version ||= $schema->schema_version || '1.x';
2601   $dir ||= './';
2602   my $filename = $schema->ddl_filename($type, $version, $dir);
2603   if(-f $filename)
2604   {
2605       my $file;
2606       open($file, "<$filename")
2607         or $self->throw_exception("Can't open $filename ($!)");
2608       my @rows = <$file>;
2609       close($file);
2610       return join('', @rows);
2611   }
2612
2613   $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
2614     if !$self->_sqlt_version_ok;
2615
2616   # sources needs to be a parser arg, but for simplicty allow at top level
2617   # coming in
2618   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2619       if exists $sqltargs->{sources};
2620
2621   my $tr = SQL::Translator->new(
2622     producer => "SQL::Translator::Producer::${type}",
2623     %$sqltargs,
2624     parser => 'SQL::Translator::Parser::DBIx::Class',
2625     data => $schema,
2626   );
2627   return $tr->translate;
2628 }
2629
2630 sub deploy {
2631   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2632   my $deploy = sub {
2633     my $line = shift;
2634     return if($line =~ /^--/);
2635     return if(!$line);
2636     # next if($line =~ /^DROP/m);
2637     return if($line =~ /^BEGIN TRANSACTION/m);
2638     return if($line =~ /^COMMIT/m);
2639     return if $line =~ /^\s+$/; # skip whitespace only
2640     $self->_query_start($line);
2641     eval {
2642       # do a dbh_do cycle here, as we need some error checking in
2643       # place (even though we will ignore errors)
2644       $self->dbh_do (sub { $_[1]->do($line) });
2645     };
2646     if ($@) {
2647       carp qq{$@ (running "${line}")};
2648     }
2649     $self->_query_end($line);
2650   };
2651   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2652   if (@statements > 1) {
2653     foreach my $statement (@statements) {
2654       $deploy->( $statement );
2655     }
2656   }
2657   elsif (@statements == 1) {
2658     foreach my $line ( split(";\n", $statements[0])) {
2659       $deploy->( $line );
2660     }
2661   }
2662 }
2663
2664 =head2 datetime_parser
2665
2666 Returns the datetime parser class
2667
2668 =cut
2669
2670 sub datetime_parser {
2671   my $self = shift;
2672   return $self->{datetime_parser} ||= do {
2673     $self->build_datetime_parser(@_);
2674   };
2675 }
2676
2677 =head2 datetime_parser_type
2678
2679 Defines (returns) the datetime parser class - currently hardwired to
2680 L<DateTime::Format::MySQL>
2681
2682 =cut
2683
2684 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2685
2686 =head2 build_datetime_parser
2687
2688 See L</datetime_parser>
2689
2690 =cut
2691
2692 sub build_datetime_parser {
2693   if (not $_[0]->_driver_determined) {
2694     $_[0]->_determine_driver;
2695     goto $_[0]->can('build_datetime_parser');
2696   }
2697
2698   my $self = shift;
2699   my $type = $self->datetime_parser_type(@_);
2700   $self->ensure_class_loaded ($type);
2701   return $type;
2702 }
2703
2704
2705 =head2 is_replicating
2706
2707 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2708 replicate from a master database.  Default is undef, which is the result
2709 returned by databases that don't support replication.
2710
2711 =cut
2712
2713 sub is_replicating {
2714     return;
2715
2716 }
2717
2718 =head2 lag_behind_master
2719
2720 Returns a number that represents a certain amount of lag behind a master db
2721 when a given storage is replicating.  The number is database dependent, but
2722 starts at zero and increases with the amount of lag. Default in undef
2723
2724 =cut
2725
2726 sub lag_behind_master {
2727     return;
2728 }
2729
2730 # SQLT version handling
2731 {
2732   my $_sqlt_version_ok;     # private
2733   my $_sqlt_version_error;  # private
2734
2735   sub _sqlt_version_ok {
2736     if (!defined $_sqlt_version_ok) {
2737       eval "use SQL::Translator $minimum_sqlt_version";
2738       if ($@) {
2739         $_sqlt_version_ok = 0;
2740         $_sqlt_version_error = $@;
2741       }
2742       else {
2743         $_sqlt_version_ok = 1;
2744       }
2745     }
2746     return $_sqlt_version_ok;
2747   }
2748
2749   sub _sqlt_version_error {
2750     shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
2751     return $_sqlt_version_error;
2752   }
2753
2754   sub _sqlt_minimum_version { $minimum_sqlt_version };
2755 }
2756
2757 sub DESTROY {
2758   my $self = shift;
2759
2760   $self->_verify_pid if $self->_dbh;
2761
2762   # some databases need this to stop spewing warnings
2763   if (my $dbh = $self->_dbh) {
2764     local $@;
2765     eval { $dbh->disconnect };
2766   }
2767
2768   $self->_dbh(undef);
2769 }
2770
2771 1;
2772
2773 =head1 USAGE NOTES
2774
2775 =head2 DBIx::Class and AutoCommit
2776
2777 DBIx::Class can do some wonderful magic with handling exceptions,
2778 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2779 (the default) combined with C<txn_do> for transaction support.
2780
2781 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2782 in an assumed transaction between commits, and you're telling us you'd
2783 like to manage that manually.  A lot of the magic protections offered by
2784 this module will go away.  We can't protect you from exceptions due to database
2785 disconnects because we don't know anything about how to restart your
2786 transactions.  You're on your own for handling all sorts of exceptional
2787 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2788 be with raw DBI.
2789
2790
2791 =head1 AUTHORS
2792
2793 Matt S. Trout <mst@shadowcatsystems.co.uk>
2794
2795 Andy Grundman <andy@hybridized.org>
2796
2797 =head1 LICENSE
2798
2799 You may distribute this code under the same terms as Perl itself.
2800
2801 =cut