Fix left-join chaining
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16
17 __PACKAGE__->mk_group_accessors('simple' =>
18   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
19      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
26   disable_sth_caching unsafe auto_savepoint
27 /;
28 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
29
30
31 # default cursor class, overridable in connect_info attributes
32 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
33
34 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
35 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
36
37
38 =head1 NAME
39
40 DBIx::Class::Storage::DBI - DBI storage handler
41
42 =head1 SYNOPSIS
43
44   my $schema = MySchema->connect('dbi:SQLite:my.db');
45
46   $schema->storage->debug(1);
47
48   my @stuff = $schema->storage->dbh_do(
49     sub {
50       my ($storage, $dbh, @args) = @_;
51       $dbh->do("DROP TABLE authors");
52     },
53     @column_list
54   );
55
56   $schema->resultset('Book')->search({
57      written_on => $schema->storage->datetime_parser(DateTime->now)
58   });
59
60 =head1 DESCRIPTION
61
62 This class represents the connection to an RDBMS via L<DBI>.  See
63 L<DBIx::Class::Storage> for general information.  This pod only
64 documents DBI-specific methods and behaviors.
65
66 =head1 METHODS
67
68 =cut
69
70 sub new {
71   my $new = shift->next::method(@_);
72
73   $new->transaction_depth(0);
74   $new->_sql_maker_opts({});
75   $new->{savepoints} = [];
76   $new->{_in_dbh_do} = 0;
77   $new->{_dbh_gen} = 0;
78
79   $new;
80 }
81
82 =head2 connect_info
83
84 This method is normally called by L<DBIx::Class::Schema/connection>, which
85 encapsulates its argument list in an arrayref before passing them here.
86
87 The argument list may contain:
88
89 =over
90
91 =item *
92
93 The same 4-element argument set one would normally pass to
94 L<DBI/connect>, optionally followed by
95 L<extra attributes|/DBIx::Class specific connection attributes>
96 recognized by DBIx::Class:
97
98   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
99
100 =item *
101
102 A single code reference which returns a connected
103 L<DBI database handle|DBI/connect> optionally followed by
104 L<extra attributes|/DBIx::Class specific connection attributes> recognized
105 by DBIx::Class:
106
107   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
108
109 =item *
110
111 A single hashref with all the attributes and the dsn/user/password
112 mixed together:
113
114   $connect_info_args = [{
115     dsn => $dsn,
116     user => $user,
117     password => $pass,
118     %dbi_attributes,
119     %extra_attributes,
120   }];
121
122   $connect_info_args = [{
123     dbh_maker => sub { DBI->connect (...) },
124     %dbi_attributes,
125     %extra_attributes,
126   }];
127
128 This is particularly useful for L<Catalyst> based applications, allowing the
129 following config (L<Config::General> style):
130
131   <Model::DB>
132     schema_class   App::DB
133     <connect_info>
134       dsn          dbi:mysql:database=test
135       user         testuser
136       password     TestPass
137       AutoCommit   1
138     </connect_info>
139   </Model::DB>
140
141 The C<dsn>/C<user>/C<password> combination can be substituted by the
142 C<dbh_maker> key whose value is a coderef that returns a connected
143 L<DBI database handle|DBI/connect>
144
145 =back
146
147 Please note that the L<DBI> docs recommend that you always explicitly
148 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
149 recommends that it be set to I<1>, and that you perform transactions
150 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
151 to I<1> if you do not do explicitly set it to zero.  This is the default
152 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
153
154 =head3 DBIx::Class specific connection attributes
155
156 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
157 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
158 the following connection options. These options can be mixed in with your other
159 L<DBI> connection attributes, or placed in a seperate hashref
160 (C<\%extra_attributes>) as shown above.
161
162 Every time C<connect_info> is invoked, any previous settings for
163 these options will be cleared before setting the new ones, regardless of
164 whether any options are specified in the new C<connect_info>.
165
166
167 =over
168
169 =item on_connect_do
170
171 Specifies things to do immediately after connecting or re-connecting to
172 the database.  Its value may contain:
173
174 =over
175
176 =item a scalar
177
178 This contains one SQL statement to execute.
179
180 =item an array reference
181
182 This contains SQL statements to execute in order.  Each element contains
183 a string or a code reference that returns a string.
184
185 =item a code reference
186
187 This contains some code to execute.  Unlike code references within an
188 array reference, its return value is ignored.
189
190 =back
191
192 =item on_disconnect_do
193
194 Takes arguments in the same form as L</on_connect_do> and executes them
195 immediately before disconnecting from the database.
196
197 Note, this only runs if you explicitly call L</disconnect> on the
198 storage object.
199
200 =item on_connect_call
201
202 A more generalized form of L</on_connect_do> that calls the specified
203 C<connect_call_METHOD> methods in your storage driver.
204
205   on_connect_do => 'select 1'
206
207 is equivalent to:
208
209   on_connect_call => [ [ do_sql => 'select 1' ] ]
210
211 Its values may contain:
212
213 =over
214
215 =item a scalar
216
217 Will call the C<connect_call_METHOD> method.
218
219 =item a code reference
220
221 Will execute C<< $code->($storage) >>
222
223 =item an array reference
224
225 Each value can be a method name or code reference.
226
227 =item an array of arrays
228
229 For each array, the first item is taken to be the C<connect_call_> method name
230 or code reference, and the rest are parameters to it.
231
232 =back
233
234 Some predefined storage methods you may use:
235
236 =over
237
238 =item do_sql
239
240 Executes a SQL string or a code reference that returns a SQL string. This is
241 what L</on_connect_do> and L</on_disconnect_do> use.
242
243 It can take:
244
245 =over
246
247 =item a scalar
248
249 Will execute the scalar as SQL.
250
251 =item an arrayref
252
253 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
254 attributes hashref and bind values.
255
256 =item a code reference
257
258 Will execute C<< $code->($storage) >> and execute the return array refs as
259 above.
260
261 =back
262
263 =item datetime_setup
264
265 Execute any statements necessary to initialize the database session to return
266 and accept datetime/timestamp values used with
267 L<DBIx::Class::InflateColumn::DateTime>.
268
269 Only necessary for some databases, see your specific storage driver for
270 implementation details.
271
272 =back
273
274 =item on_disconnect_call
275
276 Takes arguments in the same form as L</on_connect_call> and executes them
277 immediately before disconnecting from the database.
278
279 Calls the C<disconnect_call_METHOD> methods as opposed to the
280 C<connect_call_METHOD> methods called by L</on_connect_call>.
281
282 Note, this only runs if you explicitly call L</disconnect> on the
283 storage object.
284
285 =item disable_sth_caching
286
287 If set to a true value, this option will disable the caching of
288 statement handles via L<DBI/prepare_cached>.
289
290 =item limit_dialect
291
292 Sets the limit dialect. This is useful for JDBC-bridge among others
293 where the remote SQL-dialect cannot be determined by the name of the
294 driver alone. See also L<SQL::Abstract::Limit>.
295
296 =item quote_char
297
298 Specifies what characters to use to quote table and column names. If
299 you use this you will want to specify L</name_sep> as well.
300
301 C<quote_char> expects either a single character, in which case is it
302 is placed on either side of the table/column name, or an arrayref of length
303 2 in which case the table/column name is placed between the elements.
304
305 For example under MySQL you should use C<< quote_char => '`' >>, and for
306 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
307
308 =item name_sep
309
310 This only needs to be used in conjunction with C<quote_char>, and is used to
311 specify the charecter that seperates elements (schemas, tables, columns) from
312 each other. In most cases this is simply a C<.>.
313
314 The consequences of not supplying this value is that L<SQL::Abstract>
315 will assume DBIx::Class' uses of aliases to be complete column
316 names. The output will look like I<"me.name"> when it should actually
317 be I<"me"."name">.
318
319 =item unsafe
320
321 This Storage driver normally installs its own C<HandleError>, sets
322 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
323 all database handles, including those supplied by a coderef.  It does this
324 so that it can have consistent and useful error behavior.
325
326 If you set this option to a true value, Storage will not do its usual
327 modifications to the database handle's attributes, and instead relies on
328 the settings in your connect_info DBI options (or the values you set in
329 your connection coderef, in the case that you are connecting via coderef).
330
331 Note that your custom settings can cause Storage to malfunction,
332 especially if you set a C<HandleError> handler that suppresses exceptions
333 and/or disable C<RaiseError>.
334
335 =item auto_savepoint
336
337 If this option is true, L<DBIx::Class> will use savepoints when nesting
338 transactions, making it possible to recover from failure in the inner
339 transaction without having to abort all outer transactions.
340
341 =item cursor_class
342
343 Use this argument to supply a cursor class other than the default
344 L<DBIx::Class::Storage::DBI::Cursor>.
345
346 =back
347
348 Some real-life examples of arguments to L</connect_info> and
349 L<DBIx::Class::Schema/connect>
350
351   # Simple SQLite connection
352   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
353
354   # Connect via subref
355   ->connect_info([ sub { DBI->connect(...) } ]);
356
357   # Connect via subref in hashref
358   ->connect_info([{
359     dbh_maker => sub { DBI->connect(...) },
360     on_connect_do => 'alter session ...',
361   }]);
362
363   # A bit more complicated
364   ->connect_info(
365     [
366       'dbi:Pg:dbname=foo',
367       'postgres',
368       'my_pg_password',
369       { AutoCommit => 1 },
370       { quote_char => q{"}, name_sep => q{.} },
371     ]
372   );
373
374   # Equivalent to the previous example
375   ->connect_info(
376     [
377       'dbi:Pg:dbname=foo',
378       'postgres',
379       'my_pg_password',
380       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
381     ]
382   );
383
384   # Same, but with hashref as argument
385   # See parse_connect_info for explanation
386   ->connect_info(
387     [{
388       dsn         => 'dbi:Pg:dbname=foo',
389       user        => 'postgres',
390       password    => 'my_pg_password',
391       AutoCommit  => 1,
392       quote_char  => q{"},
393       name_sep    => q{.},
394     }]
395   );
396
397   # Subref + DBIx::Class-specific connection options
398   ->connect_info(
399     [
400       sub { DBI->connect(...) },
401       {
402           quote_char => q{`},
403           name_sep => q{@},
404           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
405           disable_sth_caching => 1,
406       },
407     ]
408   );
409
410
411
412 =cut
413
414 sub connect_info {
415   my ($self, $info_arg) = @_;
416
417   return $self->_connect_info if !$info_arg;
418
419   my @args = @$info_arg;  # take a shallow copy for further mutilation
420   $self->_connect_info([@args]); # copy for _connect_info
421
422
423   # combine/pre-parse arguments depending on invocation style
424
425   my %attrs;
426   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
427     %attrs = %{ $args[1] || {} };
428     @args = $args[0];
429   }
430   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
431     %attrs = %{$args[0]};
432     @args = ();
433     if (my $code = delete $attrs{dbh_maker}) {
434       @args = $code;
435
436       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
437       if (@ignored) {
438         carp sprintf (
439             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
440           . "to the result of 'dbh_maker'",
441
442           join (', ', map { "'$_'" } (@ignored) ),
443         );
444       }
445     }
446     else {
447       @args = delete @attrs{qw/dsn user password/};
448     }
449   }
450   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
451     %attrs = (
452       % { $args[3] || {} },
453       % { $args[4] || {} },
454     );
455     @args = @args[0,1,2];
456   }
457
458   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
459   #  the new set of options
460   $self->_sql_maker(undef);
461   $self->_sql_maker_opts({});
462
463   if(keys %attrs) {
464     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
465       if(my $value = delete $attrs{$storage_opt}) {
466         $self->$storage_opt($value);
467       }
468     }
469     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
470       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
471         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
472       }
473     }
474   }
475
476   if (ref $args[0] eq 'CODE') {
477     # _connect() never looks past $args[0] in this case
478     %attrs = ()
479   } else {
480     %attrs = (
481       %{ $self->_default_dbi_connect_attributes || {} },
482       %attrs,
483     );
484   }
485
486   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
487   $self->_connect_info;
488 }
489
490 sub _default_dbi_connect_attributes {
491   return {
492     AutoCommit => 1,
493     RaiseError => 1,
494     PrintError => 0,
495   };
496 }
497
498 =head2 on_connect_do
499
500 This method is deprecated in favour of setting via L</connect_info>.
501
502 =cut
503
504 =head2 on_disconnect_do
505
506 This method is deprecated in favour of setting via L</connect_info>.
507
508 =cut
509
510 sub _parse_connect_do {
511   my ($self, $type) = @_;
512
513   my $val = $self->$type;
514   return () if not defined $val;
515
516   my @res;
517
518   if (not ref($val)) {
519     push @res, [ 'do_sql', $val ];
520   } elsif (ref($val) eq 'CODE') {
521     push @res, $val;
522   } elsif (ref($val) eq 'ARRAY') {
523     push @res, map { [ 'do_sql', $_ ] } @$val;
524   } else {
525     $self->throw_exception("Invalid type for $type: ".ref($val));
526   }
527
528   return \@res;
529 }
530
531 =head2 dbh_do
532
533 Arguments: ($subref | $method_name), @extra_coderef_args?
534
535 Execute the given $subref or $method_name using the new exception-based
536 connection management.
537
538 The first two arguments will be the storage object that C<dbh_do> was called
539 on and a database handle to use.  Any additional arguments will be passed
540 verbatim to the called subref as arguments 2 and onwards.
541
542 Using this (instead of $self->_dbh or $self->dbh) ensures correct
543 exception handling and reconnection (or failover in future subclasses).
544
545 Your subref should have no side-effects outside of the database, as
546 there is the potential for your subref to be partially double-executed
547 if the database connection was stale/dysfunctional.
548
549 Example:
550
551   my @stuff = $schema->storage->dbh_do(
552     sub {
553       my ($storage, $dbh, @cols) = @_;
554       my $cols = join(q{, }, @cols);
555       $dbh->selectrow_array("SELECT $cols FROM foo");
556     },
557     @column_list
558   );
559
560 =cut
561
562 sub dbh_do {
563   my $self = shift;
564   my $code = shift;
565
566   my $dbh = $self->_get_dbh;
567
568   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
569       || $self->{transaction_depth};
570
571   local $self->{_in_dbh_do} = 1;
572
573   my @result;
574   my $want_array = wantarray;
575
576   eval {
577
578     if($want_array) {
579         @result = $self->$code($dbh, @_);
580     }
581     elsif(defined $want_array) {
582         $result[0] = $self->$code($dbh, @_);
583     }
584     else {
585         $self->$code($dbh, @_);
586     }
587   };
588
589   # ->connected might unset $@ - copy
590   my $exception = $@;
591   if(!$exception) { return $want_array ? @result : $result[0] }
592
593   $self->throw_exception($exception) if $self->connected;
594
595   # We were not connected - reconnect and retry, but let any
596   #  exception fall right through this time
597   carp "Retrying $code after catching disconnected exception: $exception"
598     if $ENV{DBIC_DBIRETRY_DEBUG};
599   $self->_populate_dbh;
600   $self->$code($self->_dbh, @_);
601 }
602
603 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
604 # It also informs dbh_do to bypass itself while under the direction of txn_do,
605 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
606 sub txn_do {
607   my $self = shift;
608   my $coderef = shift;
609
610   ref $coderef eq 'CODE' or $self->throw_exception
611     ('$coderef must be a CODE reference');
612
613   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
614
615   local $self->{_in_dbh_do} = 1;
616
617   my @result;
618   my $want_array = wantarray;
619
620   my $tried = 0;
621   while(1) {
622     eval {
623       $self->_get_dbh;
624
625       $self->txn_begin;
626       if($want_array) {
627           @result = $coderef->(@_);
628       }
629       elsif(defined $want_array) {
630           $result[0] = $coderef->(@_);
631       }
632       else {
633           $coderef->(@_);
634       }
635       $self->txn_commit;
636     };
637
638     # ->connected might unset $@ - copy
639     my $exception = $@;
640     if(!$exception) { return $want_array ? @result : $result[0] }
641
642     if($tried++ || $self->connected) {
643       eval { $self->txn_rollback };
644       my $rollback_exception = $@;
645       if($rollback_exception) {
646         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
647         $self->throw_exception($exception)  # propagate nested rollback
648           if $rollback_exception =~ /$exception_class/;
649
650         $self->throw_exception(
651           "Transaction aborted: ${exception}. "
652           . "Rollback failed: ${rollback_exception}"
653         );
654       }
655       $self->throw_exception($exception)
656     }
657
658     # We were not connected, and was first try - reconnect and retry
659     # via the while loop
660     carp "Retrying $coderef after catching disconnected exception: $exception"
661       if $ENV{DBIC_DBIRETRY_DEBUG};
662     $self->_populate_dbh;
663   }
664 }
665
666 =head2 disconnect
667
668 Our C<disconnect> method also performs a rollback first if the
669 database is not in C<AutoCommit> mode.
670
671 =cut
672
673 sub disconnect {
674   my ($self) = @_;
675
676   if( $self->_dbh ) {
677     my @actions;
678
679     push @actions, ( $self->on_disconnect_call || () );
680     push @actions, $self->_parse_connect_do ('on_disconnect_do');
681
682     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
683
684     $self->_dbh->rollback unless $self->_dbh_autocommit;
685     $self->_dbh->disconnect;
686     $self->_dbh(undef);
687     $self->{_dbh_gen}++;
688   }
689 }
690
691 =head2 with_deferred_fk_checks
692
693 =over 4
694
695 =item Arguments: C<$coderef>
696
697 =item Return Value: The return value of $coderef
698
699 =back
700
701 Storage specific method to run the code ref with FK checks deferred or
702 in MySQL's case disabled entirely.
703
704 =cut
705
706 # Storage subclasses should override this
707 sub with_deferred_fk_checks {
708   my ($self, $sub) = @_;
709
710   $sub->();
711 }
712
713 =head2 connected
714
715 =over
716
717 =item Arguments: none
718
719 =item Return Value: 1|0
720
721 =back
722
723 Verifies that the the current database handle is active and ready to execute
724 an SQL statement (i.e. the connection did not get stale, server is still
725 answering, etc.) This method is used internally by L</dbh>.
726
727 =cut
728
729 sub connected {
730   my $self = shift;
731   return 0 unless $self->_seems_connected;
732
733   #be on the safe side
734   local $self->_dbh->{RaiseError} = 1;
735
736   return $self->_ping;
737 }
738
739 sub _seems_connected {
740   my $self = shift;
741
742   my $dbh = $self->_dbh
743     or return 0;
744
745   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
746     $self->_dbh(undef);
747     $self->{_dbh_gen}++;
748     return 0;
749   }
750   else {
751     $self->_verify_pid;
752     return 0 if !$self->_dbh;
753   }
754
755   return $dbh->FETCH('Active');
756 }
757
758 sub _ping {
759   my $self = shift;
760
761   my $dbh = $self->_dbh or return 0;
762
763   return $dbh->ping;
764 }
765
766 # handle pid changes correctly
767 #  NOTE: assumes $self->_dbh is a valid $dbh
768 sub _verify_pid {
769   my ($self) = @_;
770
771   return if defined $self->_conn_pid && $self->_conn_pid == $$;
772
773   $self->_dbh->{InactiveDestroy} = 1;
774   $self->_dbh(undef);
775   $self->{_dbh_gen}++;
776
777   return;
778 }
779
780 sub ensure_connected {
781   my ($self) = @_;
782
783   unless ($self->connected) {
784     $self->_populate_dbh;
785   }
786 }
787
788 =head2 dbh
789
790 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
791 is guaranteed to be healthy by implicitly calling L</connected>, and if
792 necessary performing a reconnection before returning. Keep in mind that this
793 is very B<expensive> on some database engines. Consider using L<dbh_do>
794 instead.
795
796 =cut
797
798 sub dbh {
799   my ($self) = @_;
800
801   if (not $self->_dbh) {
802     $self->_populate_dbh;
803   } else {
804     $self->ensure_connected;
805   }
806   return $self->_dbh;
807 }
808
809 # this is the internal "get dbh or connect (don't check)" method
810 sub _get_dbh {
811   my $self = shift;
812   $self->_verify_pid if $self->_dbh;
813   $self->_populate_dbh unless $self->_dbh;
814   return $self->_dbh;
815 }
816
817 sub _sql_maker_args {
818     my ($self) = @_;
819
820     return (
821       bindtype=>'columns',
822       array_datatypes => 1,
823       limit_dialect => $self->_get_dbh,
824       %{$self->_sql_maker_opts}
825     );
826 }
827
828 sub sql_maker {
829   my ($self) = @_;
830   unless ($self->_sql_maker) {
831     my $sql_maker_class = $self->sql_maker_class;
832     $self->ensure_class_loaded ($sql_maker_class);
833     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
834   }
835   return $self->_sql_maker;
836 }
837
838 sub _rebless {}
839
840 sub _populate_dbh {
841   my ($self) = @_;
842
843   my @info = @{$self->_dbi_connect_info || []};
844   $self->_dbh(undef); # in case ->connected failed we might get sent here
845   $self->_dbh($self->_connect(@info));
846
847   $self->_conn_pid($$);
848   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
849
850   $self->_determine_driver;
851
852   # Always set the transaction depth on connect, since
853   #  there is no transaction in progress by definition
854   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
855
856   $self->_run_connection_actions unless $self->{_in_determine_driver};
857 }
858
859 sub _run_connection_actions {
860   my $self = shift;
861   my @actions;
862
863   push @actions, ( $self->on_connect_call || () );
864   push @actions, $self->_parse_connect_do ('on_connect_do');
865
866   $self->_do_connection_actions(connect_call_ => $_) for @actions;
867 }
868
869 sub _determine_driver {
870   my ($self) = @_;
871
872   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
873     my $started_unconnected = 0;
874     local $self->{_in_determine_driver} = 1;
875
876     if (ref($self) eq __PACKAGE__) {
877       my $driver;
878       if ($self->_dbh) { # we are connected
879         $driver = $self->_dbh->{Driver}{Name};
880       } else {
881         # if connect_info is a CODEREF, we have no choice but to connect
882         if (ref $self->_dbi_connect_info->[0] &&
883             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
884           $self->_populate_dbh;
885           $driver = $self->_dbh->{Driver}{Name};
886         }
887         else {
888           # try to use dsn to not require being connected, the driver may still
889           # force a connection in _rebless to determine version
890           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
891           $started_unconnected = 1;
892         }
893       }
894
895       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
896       if ($self->load_optional_class($storage_class)) {
897         mro::set_mro($storage_class, 'c3');
898         bless $self, $storage_class;
899         $self->_rebless();
900       }
901     }
902
903     $self->_driver_determined(1);
904
905     $self->_run_connection_actions
906         if $started_unconnected && defined $self->_dbh;
907   }
908 }
909
910 sub _do_connection_actions {
911   my $self          = shift;
912   my $method_prefix = shift;
913   my $call          = shift;
914
915   if (not ref($call)) {
916     my $method = $method_prefix . $call;
917     $self->$method(@_);
918   } elsif (ref($call) eq 'CODE') {
919     $self->$call(@_);
920   } elsif (ref($call) eq 'ARRAY') {
921     if (ref($call->[0]) ne 'ARRAY') {
922       $self->_do_connection_actions($method_prefix, $_) for @$call;
923     } else {
924       $self->_do_connection_actions($method_prefix, @$_) for @$call;
925     }
926   } else {
927     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
928   }
929
930   return $self;
931 }
932
933 sub connect_call_do_sql {
934   my $self = shift;
935   $self->_do_query(@_);
936 }
937
938 sub disconnect_call_do_sql {
939   my $self = shift;
940   $self->_do_query(@_);
941 }
942
943 # override in db-specific backend when necessary
944 sub connect_call_datetime_setup { 1 }
945
946 sub _do_query {
947   my ($self, $action) = @_;
948
949   if (ref $action eq 'CODE') {
950     $action = $action->($self);
951     $self->_do_query($_) foreach @$action;
952   }
953   else {
954     # Most debuggers expect ($sql, @bind), so we need to exclude
955     # the attribute hash which is the second argument to $dbh->do
956     # furthermore the bind values are usually to be presented
957     # as named arrayref pairs, so wrap those here too
958     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
959     my $sql = shift @do_args;
960     my $attrs = shift @do_args;
961     my @bind = map { [ undef, $_ ] } @do_args;
962
963     $self->_query_start($sql, @bind);
964     $self->_get_dbh->do($sql, $attrs, @do_args);
965     $self->_query_end($sql, @bind);
966   }
967
968   return $self;
969 }
970
971 sub _connect {
972   my ($self, @info) = @_;
973
974   $self->throw_exception("You failed to provide any connection info")
975     if !@info;
976
977   my ($old_connect_via, $dbh);
978
979   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
980     $old_connect_via = $DBI::connect_via;
981     $DBI::connect_via = 'connect';
982   }
983
984   eval {
985     if(ref $info[0] eq 'CODE') {
986        $dbh = &{$info[0]}
987     }
988     else {
989        $dbh = DBI->connect(@info);
990     }
991
992     if($dbh && !$self->unsafe) {
993       my $weak_self = $self;
994       Scalar::Util::weaken($weak_self);
995       $dbh->{HandleError} = sub {
996           if ($weak_self) {
997             $weak_self->throw_exception("DBI Exception: $_[0]");
998           }
999           else {
1000             croak ("DBI Exception: $_[0]");
1001           }
1002       };
1003       $dbh->{ShowErrorStatement} = 1;
1004       $dbh->{RaiseError} = 1;
1005       $dbh->{PrintError} = 0;
1006     }
1007   };
1008
1009   $DBI::connect_via = $old_connect_via if $old_connect_via;
1010
1011   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1012     if !$dbh || $@;
1013
1014   $self->_dbh_autocommit($dbh->{AutoCommit});
1015
1016   $dbh;
1017 }
1018
1019 sub svp_begin {
1020   my ($self, $name) = @_;
1021
1022   $name = $self->_svp_generate_name
1023     unless defined $name;
1024
1025   $self->throw_exception ("You can't use savepoints outside a transaction")
1026     if $self->{transaction_depth} == 0;
1027
1028   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1029     unless $self->can('_svp_begin');
1030
1031   push @{ $self->{savepoints} }, $name;
1032
1033   $self->debugobj->svp_begin($name) if $self->debug;
1034
1035   return $self->_svp_begin($name);
1036 }
1037
1038 sub svp_release {
1039   my ($self, $name) = @_;
1040
1041   $self->throw_exception ("You can't use savepoints outside a transaction")
1042     if $self->{transaction_depth} == 0;
1043
1044   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1045     unless $self->can('_svp_release');
1046
1047   if (defined $name) {
1048     $self->throw_exception ("Savepoint '$name' does not exist")
1049       unless grep { $_ eq $name } @{ $self->{savepoints} };
1050
1051     # Dig through the stack until we find the one we are releasing.  This keeps
1052     # the stack up to date.
1053     my $svp;
1054
1055     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1056   } else {
1057     $name = pop @{ $self->{savepoints} };
1058   }
1059
1060   $self->debugobj->svp_release($name) if $self->debug;
1061
1062   return $self->_svp_release($name);
1063 }
1064
1065 sub svp_rollback {
1066   my ($self, $name) = @_;
1067
1068   $self->throw_exception ("You can't use savepoints outside a transaction")
1069     if $self->{transaction_depth} == 0;
1070
1071   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1072     unless $self->can('_svp_rollback');
1073
1074   if (defined $name) {
1075       # If they passed us a name, verify that it exists in the stack
1076       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1077           $self->throw_exception("Savepoint '$name' does not exist!");
1078       }
1079
1080       # Dig through the stack until we find the one we are releasing.  This keeps
1081       # the stack up to date.
1082       while(my $s = pop(@{ $self->{savepoints} })) {
1083           last if($s eq $name);
1084       }
1085       # Add the savepoint back to the stack, as a rollback doesn't remove the
1086       # named savepoint, only everything after it.
1087       push(@{ $self->{savepoints} }, $name);
1088   } else {
1089       # We'll assume they want to rollback to the last savepoint
1090       $name = $self->{savepoints}->[-1];
1091   }
1092
1093   $self->debugobj->svp_rollback($name) if $self->debug;
1094
1095   return $self->_svp_rollback($name);
1096 }
1097
1098 sub _svp_generate_name {
1099     my ($self) = @_;
1100
1101     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1102 }
1103
1104 sub txn_begin {
1105   my $self = shift;
1106   if($self->{transaction_depth} == 0) {
1107     $self->debugobj->txn_begin()
1108       if $self->debug;
1109
1110     # being here implies we have AutoCommit => 1
1111     # if the user is utilizing txn_do - good for
1112     # him, otherwise we need to ensure that the
1113     # $dbh is healthy on BEGIN
1114     my $dbh_method = $self->{_in_dbh_do} ? '_dbh' : 'dbh';
1115     $self->$dbh_method->begin_work;
1116
1117   } elsif ($self->auto_savepoint) {
1118     $self->svp_begin;
1119   }
1120   $self->{transaction_depth}++;
1121 }
1122
1123 sub txn_commit {
1124   my $self = shift;
1125   if ($self->{transaction_depth} == 1) {
1126     my $dbh = $self->_dbh;
1127     $self->debugobj->txn_commit()
1128       if ($self->debug);
1129     $dbh->commit;
1130     $self->{transaction_depth} = 0
1131       if $self->_dbh_autocommit;
1132   }
1133   elsif($self->{transaction_depth} > 1) {
1134     $self->{transaction_depth}--;
1135     $self->svp_release
1136       if $self->auto_savepoint;
1137   }
1138 }
1139
1140 sub txn_rollback {
1141   my $self = shift;
1142   my $dbh = $self->_dbh;
1143   eval {
1144     if ($self->{transaction_depth} == 1) {
1145       $self->debugobj->txn_rollback()
1146         if ($self->debug);
1147       $self->{transaction_depth} = 0
1148         if $self->_dbh_autocommit;
1149       $dbh->rollback;
1150     }
1151     elsif($self->{transaction_depth} > 1) {
1152       $self->{transaction_depth}--;
1153       if ($self->auto_savepoint) {
1154         $self->svp_rollback;
1155         $self->svp_release;
1156       }
1157     }
1158     else {
1159       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1160     }
1161   };
1162   if ($@) {
1163     my $error = $@;
1164     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1165     $error =~ /$exception_class/ and $self->throw_exception($error);
1166     # ensure that a failed rollback resets the transaction depth
1167     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1168     $self->throw_exception($error);
1169   }
1170 }
1171
1172 # This used to be the top-half of _execute.  It was split out to make it
1173 #  easier to override in NoBindVars without duping the rest.  It takes up
1174 #  all of _execute's args, and emits $sql, @bind.
1175 sub _prep_for_execute {
1176   my ($self, $op, $extra_bind, $ident, $args) = @_;
1177
1178   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1179     $ident = $ident->from();
1180   }
1181
1182   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1183
1184   unshift(@bind,
1185     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1186       if $extra_bind;
1187   return ($sql, \@bind);
1188 }
1189
1190
1191 sub _fix_bind_params {
1192     my ($self, @bind) = @_;
1193
1194     ### Turn @bind from something like this:
1195     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1196     ### to this:
1197     ###   ( "'1'", "'1'", "'3'" )
1198     return
1199         map {
1200             if ( defined( $_ && $_->[1] ) ) {
1201                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1202             }
1203             else { q{'NULL'}; }
1204         } @bind;
1205 }
1206
1207 sub _query_start {
1208     my ( $self, $sql, @bind ) = @_;
1209
1210     if ( $self->debug ) {
1211         @bind = $self->_fix_bind_params(@bind);
1212
1213         $self->debugobj->query_start( $sql, @bind );
1214     }
1215 }
1216
1217 sub _query_end {
1218     my ( $self, $sql, @bind ) = @_;
1219
1220     if ( $self->debug ) {
1221         @bind = $self->_fix_bind_params(@bind);
1222         $self->debugobj->query_end( $sql, @bind );
1223     }
1224 }
1225
1226 sub _dbh_execute {
1227   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1228
1229   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1230
1231   $self->_query_start( $sql, @$bind );
1232
1233   my $sth = $self->sth($sql,$op);
1234
1235   my $placeholder_index = 1;
1236
1237   foreach my $bound (@$bind) {
1238     my $attributes = {};
1239     my($column_name, @data) = @$bound;
1240
1241     if ($bind_attributes) {
1242       $attributes = $bind_attributes->{$column_name}
1243       if defined $bind_attributes->{$column_name};
1244     }
1245
1246     foreach my $data (@data) {
1247       my $ref = ref $data;
1248       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1249
1250       $sth->bind_param($placeholder_index, $data, $attributes);
1251       $placeholder_index++;
1252     }
1253   }
1254
1255   # Can this fail without throwing an exception anyways???
1256   my $rv = $sth->execute();
1257   $self->throw_exception($sth->errstr) if !$rv;
1258
1259   $self->_query_end( $sql, @$bind );
1260
1261   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1262 }
1263
1264 sub _execute {
1265     my $self = shift;
1266     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1267 }
1268
1269 sub insert {
1270   my ($self, $source, $to_insert) = @_;
1271
1272 # redispatch to insert method of storage we reblessed into, if necessary
1273   if (not $self->_driver_determined) {
1274     $self->_determine_driver;
1275     goto $self->can('insert');
1276   }
1277
1278   my $ident = $source->from;
1279   my $bind_attributes = $self->source_bind_attributes($source);
1280
1281   my $updated_cols = {};
1282
1283   foreach my $col ( $source->columns ) {
1284     if ( !defined $to_insert->{$col} ) {
1285       my $col_info = $source->column_info($col);
1286
1287       if ( $col_info->{auto_nextval} ) {
1288         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1289           'nextval',
1290           $col_info->{sequence} ||
1291             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1292         );
1293       }
1294     }
1295   }
1296
1297   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1298
1299   return $updated_cols;
1300 }
1301
1302 ## Still not quite perfect, and EXPERIMENTAL
1303 ## Currently it is assumed that all values passed will be "normal", i.e. not
1304 ## scalar refs, or at least, all the same type as the first set, the statement is
1305 ## only prepped once.
1306 sub insert_bulk {
1307   my ($self, $source, $cols, $data) = @_;
1308
1309 # redispatch to insert_bulk method of storage we reblessed into, if necessary
1310   if (not $self->_driver_determined) {
1311     $self->_determine_driver;
1312     goto $self->can('insert_bulk');
1313   }
1314
1315   my %colvalues;
1316   my $table = $source->from;
1317   @colvalues{@$cols} = (0..$#$cols);
1318   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1319
1320   $self->_query_start( $sql, @bind );
1321   my $sth = $self->sth($sql);
1322
1323 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1324
1325   ## This must be an arrayref, else nothing works!
1326   my $tuple_status = [];
1327
1328   ## Get the bind_attributes, if any exist
1329   my $bind_attributes = $self->source_bind_attributes($source);
1330
1331   ## Bind the values and execute
1332   my $placeholder_index = 1;
1333
1334   foreach my $bound (@bind) {
1335
1336     my $attributes = {};
1337     my ($column_name, $data_index) = @$bound;
1338
1339     if( $bind_attributes ) {
1340       $attributes = $bind_attributes->{$column_name}
1341       if defined $bind_attributes->{$column_name};
1342     }
1343
1344     my @data = map { $_->[$data_index] } @$data;
1345
1346     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1347     $placeholder_index++;
1348   }
1349   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1350   if (my $err = $@) {
1351     my $i = 0;
1352     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1353
1354     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1355       if ($i > $#$tuple_status);
1356
1357     require Data::Dumper;
1358     local $Data::Dumper::Terse = 1;
1359     local $Data::Dumper::Indent = 1;
1360     local $Data::Dumper::Useqq = 1;
1361     local $Data::Dumper::Quotekeys = 0;
1362     local $Data::Dumper::Sortkeys = 1;
1363
1364     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1365       $tuple_status->[$i][1],
1366       Data::Dumper::Dumper(
1367         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1368       ),
1369     );
1370   }
1371   $self->throw_exception($sth->errstr) if !$rv;
1372
1373   $self->_query_end( $sql, @bind );
1374   return (wantarray ? ($rv, $sth, @bind) : $rv);
1375 }
1376
1377 sub update {
1378   my $self = shift @_;
1379   my $source = shift @_;
1380   $self->_determine_driver;
1381   my $bind_attributes = $self->source_bind_attributes($source);
1382
1383   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1384 }
1385
1386
1387 sub delete {
1388   my $self = shift @_;
1389   my $source = shift @_;
1390   $self->_determine_driver;
1391   my $bind_attrs = $self->source_bind_attributes($source);
1392
1393   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1394 }
1395
1396 # We were sent here because the $rs contains a complex search
1397 # which will require a subquery to select the correct rows
1398 # (i.e. joined or limited resultsets)
1399 #
1400 # Genarating a single PK column subquery is trivial and supported
1401 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1402 # Look at _multipk_update_delete()
1403 sub _subq_update_delete {
1404   my $self = shift;
1405   my ($rs, $op, $values) = @_;
1406
1407   my $rsrc = $rs->result_source;
1408
1409   # we already check this, but double check naively just in case. Should be removed soon
1410   my $sel = $rs->_resolved_attrs->{select};
1411   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1412   my @pcols = $rsrc->primary_columns;
1413   if (@$sel != @pcols) {
1414     $self->throw_exception (
1415       'Subquery update/delete can not be called on resultsets selecting a'
1416      .' number of columns different than the number of primary keys'
1417     );
1418   }
1419
1420   if (@pcols == 1) {
1421     return $self->$op (
1422       $rsrc,
1423       $op eq 'update' ? $values : (),
1424       { $pcols[0] => { -in => $rs->as_query } },
1425     );
1426   }
1427
1428   else {
1429     return $self->_multipk_update_delete (@_);
1430   }
1431 }
1432
1433 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1434 # resultset update/delete involving subqueries. So by default resort
1435 # to simple (and inefficient) delete_all style per-row opearations,
1436 # while allowing specific storages to override this with a faster
1437 # implementation.
1438 #
1439 sub _multipk_update_delete {
1440   return shift->_per_row_update_delete (@_);
1441 }
1442
1443 # This is the default loop used to delete/update rows for multi PK
1444 # resultsets, and used by mysql exclusively (because it can't do anything
1445 # else).
1446 #
1447 # We do not use $row->$op style queries, because resultset update/delete
1448 # is not expected to cascade (this is what delete_all/update_all is for).
1449 #
1450 # There should be no race conditions as the entire operation is rolled
1451 # in a transaction.
1452 #
1453 sub _per_row_update_delete {
1454   my $self = shift;
1455   my ($rs, $op, $values) = @_;
1456
1457   my $rsrc = $rs->result_source;
1458   my @pcols = $rsrc->primary_columns;
1459
1460   my $guard = $self->txn_scope_guard;
1461
1462   # emulate the return value of $sth->execute for non-selects
1463   my $row_cnt = '0E0';
1464
1465   my $subrs_cur = $rs->cursor;
1466   while (my @pks = $subrs_cur->next) {
1467
1468     my $cond;
1469     for my $i (0.. $#pcols) {
1470       $cond->{$pcols[$i]} = $pks[$i];
1471     }
1472
1473     $self->$op (
1474       $rsrc,
1475       $op eq 'update' ? $values : (),
1476       $cond,
1477     );
1478
1479     $row_cnt++;
1480   }
1481
1482   $guard->commit;
1483
1484   return $row_cnt;
1485 }
1486
1487 sub _select {
1488   my $self = shift;
1489
1490   # localization is neccessary as
1491   # 1) there is no infrastructure to pass this around before SQLA2
1492   # 2) _select_args sets it and _prep_for_execute consumes it
1493   my $sql_maker = $self->sql_maker;
1494   local $sql_maker->{_dbic_rs_attrs};
1495
1496   return $self->_execute($self->_select_args(@_));
1497 }
1498
1499 sub _select_args_to_query {
1500   my $self = shift;
1501
1502   # localization is neccessary as
1503   # 1) there is no infrastructure to pass this around before SQLA2
1504   # 2) _select_args sets it and _prep_for_execute consumes it
1505   my $sql_maker = $self->sql_maker;
1506   local $sql_maker->{_dbic_rs_attrs};
1507
1508   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1509   #  = $self->_select_args($ident, $select, $cond, $attrs);
1510   my ($op, $bind, $ident, $bind_attrs, @args) =
1511     $self->_select_args(@_);
1512
1513   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1514   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1515   $prepared_bind ||= [];
1516
1517   return wantarray
1518     ? ($sql, $prepared_bind, $bind_attrs)
1519     : \[ "($sql)", @$prepared_bind ]
1520   ;
1521 }
1522
1523 sub _select_args {
1524   my ($self, $ident, $select, $where, $attrs) = @_;
1525
1526   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1527
1528   my $sql_maker = $self->sql_maker;
1529   $sql_maker->{_dbic_rs_attrs} = {
1530     %$attrs,
1531     select => $select,
1532     from => $ident,
1533     where => $where,
1534     $rs_alias
1535       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1536       : ()
1537     ,
1538   };
1539
1540   # calculate bind_attrs before possible $ident mangling
1541   my $bind_attrs = {};
1542   for my $alias (keys %$alias2source) {
1543     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1544     for my $col (keys %$bindtypes) {
1545
1546       my $fqcn = join ('.', $alias, $col);
1547       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1548
1549       # Unqialified column names are nice, but at the same time can be
1550       # rather ambiguous. What we do here is basically go along with
1551       # the loop, adding an unqualified column slot to $bind_attrs,
1552       # alongside the fully qualified name. As soon as we encounter
1553       # another column by that name (which would imply another table)
1554       # we unset the unqualified slot and never add any info to it
1555       # to avoid erroneous type binding. If this happens the users
1556       # only choice will be to fully qualify his column name
1557
1558       if (exists $bind_attrs->{$col}) {
1559         $bind_attrs->{$col} = {};
1560       }
1561       else {
1562         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1563       }
1564     }
1565   }
1566
1567   # adjust limits
1568   if (
1569     $attrs->{software_limit}
1570       ||
1571     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1572   ) {
1573     $attrs->{software_limit} = 1;
1574   }
1575   else {
1576     $self->throw_exception("rows attribute must be positive if present")
1577       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1578
1579     # MySQL actually recommends this approach.  I cringe.
1580     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1581   }
1582
1583   my @limit;
1584
1585   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1586   # otherwise delegate the limiting to the storage, unless software limit was requested
1587   if (
1588     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1589        ||
1590     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1591       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1592   ) {
1593     ($ident, $select, $where, $attrs)
1594       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1595   }
1596   elsif (! $attrs->{software_limit} ) {
1597     push @limit, $attrs->{rows}, $attrs->{offset};
1598   }
1599
1600 ###
1601   # This would be the point to deflate anything found in $where
1602   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1603   # expect a row object. And all we have is a resultsource (it is trivial
1604   # to extract deflator coderefs via $alias2source above).
1605   #
1606   # I don't see a way forward other than changing the way deflators are
1607   # invoked, and that's just bad...
1608 ###
1609
1610   my $order = { map
1611     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1612     (qw/order_by group_by having/ )
1613   };
1614
1615   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1616 }
1617
1618 #
1619 # This is the code producing joined subqueries like:
1620 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1621 #
1622 sub _adjust_select_args_for_complex_prefetch {
1623   my ($self, $from, $select, $where, $attrs) = @_;
1624
1625   $self->throw_exception ('Nothing to prefetch... how did we get here?!')
1626     if not @{$attrs->{_prefetch_select}};
1627
1628   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1629     if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
1630
1631
1632   # generate inner/outer attribute lists, remove stuff that doesn't apply
1633   my $outer_attrs = { %$attrs };
1634   delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
1635
1636   my $inner_attrs = { %$attrs };
1637   delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
1638
1639
1640   # bring over all non-collapse-induced order_by into the inner query (if any)
1641   # the outer one will have to keep them all
1642   delete $inner_attrs->{order_by};
1643   if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
1644     $inner_attrs->{order_by} = [
1645       @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1646     ];
1647   }
1648
1649
1650   # generate the inner/outer select lists
1651   # for inside we consider only stuff *not* brought in by the prefetch
1652   # on the outside we substitute any function for its alias
1653   my $outer_select = [ @$select ];
1654   my $inner_select = [];
1655   for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
1656     my $sel = $outer_select->[$i];
1657
1658     if (ref $sel eq 'HASH' ) {
1659       $sel->{-as} ||= $attrs->{as}[$i];
1660       $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
1661     }
1662
1663     push @$inner_select, $sel;
1664   }
1665
1666   # normalize a copy of $from, so it will be easier to work with further
1667   # down (i.e. promote the initial hashref to an AoH)
1668   $from = [ @$from ];
1669   $from->[0] = [ $from->[0] ];
1670   my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1671
1672
1673   # decide which parts of the join will remain in either part of
1674   # the outer/inner query
1675
1676   # First we compose a list of which aliases are used in restrictions
1677   # (i.e. conditions/order/grouping/etc). Since we do not have
1678   # introspectable SQLA, we fall back to ugly scanning of raw SQL for
1679   # WHERE, and for pieces of ORDER BY in order to determine which aliases
1680   # need to appear in the resulting sql.
1681   # It may not be very efficient, but it's a reasonable stop-gap
1682   # Also unqualified column names will not be considered, but more often
1683   # than not this is actually ok
1684   #
1685   # In the same loop we enumerate part of the selection aliases, as
1686   # it requires the same sqla hack for the time being
1687   my ($restrict_aliases, $select_aliases, $prefetch_aliases);
1688   {
1689     # produce stuff unquoted, so it can be scanned
1690     my $sql_maker = $self->sql_maker;
1691     local $sql_maker->{quote_char};
1692     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1693     $sep = "\Q$sep\E";
1694
1695     my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
1696     my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
1697     my $where_sql = $sql_maker->where ($where);
1698     my $group_by_sql = $sql_maker->_order_by({
1699       map { $_ => $inner_attrs->{$_} } qw/group_by having/
1700     });
1701     my @non_prefetch_order_by_chunks = (map
1702       { ref $_ ? $_->[0] : $_ }
1703       $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
1704     );
1705
1706
1707     for my $alias (keys %original_join_info) {
1708       my $seen_re = qr/\b $alias $sep/x;
1709
1710       for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
1711         if ($piece =~ $seen_re) {
1712           $restrict_aliases->{$alias} = 1;
1713         }
1714       }
1715
1716       if ($non_prefetch_select_sql =~ $seen_re) {
1717           $select_aliases->{$alias} = 1;
1718       }
1719
1720       if ($prefetch_select_sql =~ $seen_re) {
1721           $prefetch_aliases->{$alias} = 1;
1722       }
1723
1724     }
1725   }
1726
1727   # Add any non-left joins to the restriction list (such joins are indeed restrictions)
1728   for my $j (values %original_join_info) {
1729     my $alias = $j->{-alias} or next;
1730     $restrict_aliases->{$alias} = 1 if (
1731       (not $j->{-join_type})
1732         or
1733       ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
1734     );
1735   }
1736
1737   # mark all join parents as mentioned
1738   # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
1739   for my $collection ($restrict_aliases, $select_aliases) {
1740     for my $alias (keys %$collection) {
1741       $collection->{$_} = 1
1742         for (@{ $original_join_info{$alias}{-join_path} || [] });
1743     }
1744   }
1745
1746   # construct the inner $from for the subquery
1747   my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
1748   my @inner_from;
1749   for my $j (@$from) {
1750     push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
1751   }
1752
1753   # if a multi-type join was needed in the subquery ("multi" is indicated by
1754   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1755   unless ($inner_attrs->{group_by}) {
1756     for my $alias (keys %inner_joins) {
1757
1758       # the dot comes from some weirdness in collapse
1759       # remove after the rewrite
1760       if ($attrs->{collapse}{".$alias"}) {
1761         $inner_attrs->{group_by} ||= $inner_select;
1762         last;
1763       }
1764     }
1765   }
1766
1767   # demote the inner_from head
1768   $inner_from[0] = $inner_from[0][0];
1769
1770   # generate the subquery
1771   my $subq = $self->_select_args_to_query (
1772     \@inner_from,
1773     $inner_select,
1774     $where,
1775     $inner_attrs,
1776   );
1777
1778   my $subq_joinspec = {
1779     -alias => $attrs->{alias},
1780     -source_handle => $inner_from[0]{-source_handle},
1781     $attrs->{alias} => $subq,
1782   };
1783
1784   # Generate the outer from - this is relatively easy (really just replace
1785   # the join slot with the subquery), with a major caveat - we can not
1786   # join anything that is non-selecting (not part of the prefetch), but at
1787   # the same time is a multi-type relationship, as it will explode the result.
1788   #
1789   # There are two possibilities here
1790   # - either the join is non-restricting, in which case we simply throw it away
1791   # - it is part of the restrictions, in which case we need to collapse the outer
1792   #   result by tackling yet another group_by to the outside of the query
1793
1794   # so first generate the outer_from, up to the substitution point
1795   my @outer_from;
1796   while (my $j = shift @$from) {
1797     if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
1798       push @outer_from, [
1799         $subq_joinspec,
1800         @{$j}[1 .. $#$j],
1801       ];
1802       last; # we'll take care of what's left in $from below
1803     }
1804     else {
1805       push @outer_from, $j;
1806     }
1807   }
1808
1809   # see what's left - throw away if not selecting/restricting
1810   # also throw in a group_by if restricting to guard against
1811   # cross-join explosions
1812   #
1813   while (my $j = shift @$from) {
1814     my $alias = $j->[0]{-alias};
1815
1816     if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
1817       push @outer_from, $j;
1818     }
1819     elsif ($restrict_aliases->{$alias}) {
1820       push @outer_from, $j;
1821
1822       # FIXME - this should be obviated by SQLA2, as I'll be able to 
1823       # have restrict_inner and restrict_outer... or something to that
1824       # effect... I think...
1825
1826       # FIXME2 - I can't find a clean way to determine if a particular join
1827       # is a multi - instead I am just treating everything as a potential
1828       # explosive join (ribasushi)
1829       #
1830       # if (my $handle = $j->[0]{-source_handle}) {
1831       #   my $rsrc = $handle->resolve;
1832       #   ... need to bail out of the following if this is not a multi,
1833       #       as it will be much easier on the db ...
1834
1835           $outer_attrs->{group_by} ||= $outer_select;
1836       # }
1837     }
1838   }
1839
1840   # demote the outer_from head
1841   $outer_from[0] = $outer_from[0][0];
1842
1843   # This is totally horrific - the $where ends up in both the inner and outer query
1844   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
1845   # then if where conditions apply to the *right* side of the prefetch, you may have
1846   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
1847   # the outer select to exclude joins you didin't want in the first place
1848   #
1849   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1850   return (\@outer_from, $outer_select, $where, $outer_attrs);
1851 }
1852
1853 sub _resolve_ident_sources {
1854   my ($self, $ident) = @_;
1855
1856   my $alias2source = {};
1857   my $rs_alias;
1858
1859   # the reason this is so contrived is that $ident may be a {from}
1860   # structure, specifying multiple tables to join
1861   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1862     # this is compat mode for insert/update/delete which do not deal with aliases
1863     $alias2source->{me} = $ident;
1864     $rs_alias = 'me';
1865   }
1866   elsif (ref $ident eq 'ARRAY') {
1867
1868     for (@$ident) {
1869       my $tabinfo;
1870       if (ref $_ eq 'HASH') {
1871         $tabinfo = $_;
1872         $rs_alias = $tabinfo->{-alias};
1873       }
1874       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1875         $tabinfo = $_->[0];
1876       }
1877
1878       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1879         if ($tabinfo->{-source_handle});
1880     }
1881   }
1882
1883   return ($alias2source, $rs_alias);
1884 }
1885
1886 # Takes $ident, \@column_names
1887 #
1888 # returns { $column_name => \%column_info, ... }
1889 # also note: this adds -result_source => $rsrc to the column info
1890 #
1891 # usage:
1892 #   my $col_sources = $self->_resolve_column_info($ident, @column_names);
1893 sub _resolve_column_info {
1894   my ($self, $ident, $colnames) = @_;
1895   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
1896
1897   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1898   $sep = "\Q$sep\E";
1899
1900   my (%return, %seen_cols);
1901
1902   # compile a global list of column names, to be able to properly
1903   # disambiguate unqualified column names (if at all possible)
1904   for my $alias (keys %$alias2src) {
1905     my $rsrc = $alias2src->{$alias};
1906     for my $colname ($rsrc->columns) {
1907       push @{$seen_cols{$colname}}, $alias;
1908     }
1909   }
1910
1911   COLUMN:
1912   foreach my $col (@$colnames) {
1913     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
1914
1915     unless ($alias) {
1916       # see if the column was seen exactly once (so we know which rsrc it came from)
1917       if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
1918         $alias = $seen_cols{$colname}[0];
1919       }
1920       else {
1921         next COLUMN;
1922       }
1923     }
1924
1925     my $rsrc = $alias2src->{$alias};
1926     $return{$col} = $rsrc && {
1927       %{$rsrc->column_info($colname)},
1928       -result_source => $rsrc,
1929       -source_alias => $alias,
1930     };
1931   }
1932
1933   return \%return;
1934 }
1935
1936 # Returns a counting SELECT for a simple count
1937 # query. Abstracted so that a storage could override
1938 # this to { count => 'firstcol' } or whatever makes
1939 # sense as a performance optimization
1940 sub _count_select {
1941   #my ($self, $source, $rs_attrs) = @_;
1942   return { count => '*' };
1943 }
1944
1945 # Returns a SELECT which will end up in the subselect
1946 # There may or may not be a group_by, as the subquery
1947 # might have been called to accomodate a limit
1948 #
1949 # Most databases would be happy with whatever ends up
1950 # here, but some choke in various ways.
1951 #
1952 sub _subq_count_select {
1953   my ($self, $source, $rs_attrs) = @_;
1954   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1955
1956   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1957   return @pcols ? \@pcols : [ 1 ];
1958 }
1959
1960
1961 sub source_bind_attributes {
1962   my ($self, $source) = @_;
1963
1964   my $bind_attributes;
1965   foreach my $column ($source->columns) {
1966
1967     my $data_type = $source->column_info($column)->{data_type} || '';
1968     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1969      if $data_type;
1970   }
1971
1972   return $bind_attributes;
1973 }
1974
1975 =head2 select
1976
1977 =over 4
1978
1979 =item Arguments: $ident, $select, $condition, $attrs
1980
1981 =back
1982
1983 Handle a SQL select statement.
1984
1985 =cut
1986
1987 sub select {
1988   my $self = shift;
1989   my ($ident, $select, $condition, $attrs) = @_;
1990   return $self->cursor_class->new($self, \@_, $attrs);
1991 }
1992
1993 sub select_single {
1994   my $self = shift;
1995   my ($rv, $sth, @bind) = $self->_select(@_);
1996   my @row = $sth->fetchrow_array;
1997   my @nextrow = $sth->fetchrow_array if @row;
1998   if(@row && @nextrow) {
1999     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2000   }
2001   # Need to call finish() to work round broken DBDs
2002   $sth->finish();
2003   return @row;
2004 }
2005
2006 =head2 sth
2007
2008 =over 4
2009
2010 =item Arguments: $sql
2011
2012 =back
2013
2014 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2015
2016 =cut
2017
2018 sub _dbh_sth {
2019   my ($self, $dbh, $sql) = @_;
2020
2021   # 3 is the if_active parameter which avoids active sth re-use
2022   my $sth = $self->disable_sth_caching
2023     ? $dbh->prepare($sql)
2024     : $dbh->prepare_cached($sql, {}, 3);
2025
2026   # XXX You would think RaiseError would make this impossible,
2027   #  but apparently that's not true :(
2028   $self->throw_exception($dbh->errstr) if !$sth;
2029
2030   $sth;
2031 }
2032
2033 sub sth {
2034   my ($self, $sql) = @_;
2035   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2036 }
2037
2038 sub _dbh_columns_info_for {
2039   my ($self, $dbh, $table) = @_;
2040
2041   if ($dbh->can('column_info')) {
2042     my %result;
2043     eval {
2044       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2045       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2046       $sth->execute();
2047       while ( my $info = $sth->fetchrow_hashref() ){
2048         my %column_info;
2049         $column_info{data_type}   = $info->{TYPE_NAME};
2050         $column_info{size}      = $info->{COLUMN_SIZE};
2051         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2052         $column_info{default_value} = $info->{COLUMN_DEF};
2053         my $col_name = $info->{COLUMN_NAME};
2054         $col_name =~ s/^\"(.*)\"$/$1/;
2055
2056         $result{$col_name} = \%column_info;
2057       }
2058     };
2059     return \%result if !$@ && scalar keys %result;
2060   }
2061
2062   my %result;
2063   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2064   $sth->execute;
2065   my @columns = @{$sth->{NAME_lc}};
2066   for my $i ( 0 .. $#columns ){
2067     my %column_info;
2068     $column_info{data_type} = $sth->{TYPE}->[$i];
2069     $column_info{size} = $sth->{PRECISION}->[$i];
2070     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2071
2072     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2073       $column_info{data_type} = $1;
2074       $column_info{size}    = $2;
2075     }
2076
2077     $result{$columns[$i]} = \%column_info;
2078   }
2079   $sth->finish;
2080
2081   foreach my $col (keys %result) {
2082     my $colinfo = $result{$col};
2083     my $type_num = $colinfo->{data_type};
2084     my $type_name;
2085     if(defined $type_num && $dbh->can('type_info')) {
2086       my $type_info = $dbh->type_info($type_num);
2087       $type_name = $type_info->{TYPE_NAME} if $type_info;
2088       $colinfo->{data_type} = $type_name if $type_name;
2089     }
2090   }
2091
2092   return \%result;
2093 }
2094
2095 sub columns_info_for {
2096   my ($self, $table) = @_;
2097   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2098 }
2099
2100 =head2 last_insert_id
2101
2102 Return the row id of the last insert.
2103
2104 =cut
2105
2106 sub _dbh_last_insert_id {
2107     # All Storage's need to register their own _dbh_last_insert_id
2108     # the old SQLite-based method was highly inappropriate
2109
2110     my $self = shift;
2111     my $class = ref $self;
2112     $self->throw_exception (<<EOE);
2113
2114 No _dbh_last_insert_id() method found in $class.
2115 Since the method of obtaining the autoincrement id of the last insert
2116 operation varies greatly between different databases, this method must be
2117 individually implemented for every storage class.
2118 EOE
2119 }
2120
2121 sub last_insert_id {
2122   my $self = shift;
2123   $self->_dbh_last_insert_id ($self->_dbh, @_);
2124 }
2125
2126 =head2 _native_data_type
2127
2128 =over 4
2129
2130 =item Arguments: $type_name
2131
2132 =back
2133
2134 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2135 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2136 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
2137
2138 The default implementation returns C<undef>, implement in your Storage driver if
2139 you need this functionality.
2140
2141 Should map types from other databases to the native RDBMS type, for example
2142 C<VARCHAR2> to C<VARCHAR>.
2143
2144 Types with modifiers should map to the underlying data type. For example,
2145 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2146
2147 Composite types should map to the container type, for example
2148 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2149
2150 =cut
2151
2152 sub _native_data_type {
2153   #my ($self, $data_type) = @_;
2154   return undef
2155 }
2156
2157 =head2 sqlt_type
2158
2159 Returns the database driver name.
2160
2161 =cut
2162
2163 sub sqlt_type {
2164   my ($self) = @_;
2165
2166   if (not $self->_driver_determined) {
2167     $self->_determine_driver;
2168     goto $self->can ('sqlt_type');
2169   }
2170
2171   $self->_get_dbh->{Driver}->{Name};
2172 }
2173
2174 =head2 bind_attribute_by_data_type
2175
2176 Given a datatype from column info, returns a database specific bind
2177 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2178 let the database planner just handle it.
2179
2180 Generally only needed for special case column types, like bytea in postgres.
2181
2182 =cut
2183
2184 sub bind_attribute_by_data_type {
2185     return;
2186 }
2187
2188 =head2 is_datatype_numeric
2189
2190 Given a datatype from column_info, returns a boolean value indicating if
2191 the current RDBMS considers it a numeric value. This controls how
2192 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2193 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2194 be performed instead of the usual C<eq>.
2195
2196 =cut
2197
2198 sub is_datatype_numeric {
2199   my ($self, $dt) = @_;
2200
2201   return 0 unless $dt;
2202
2203   return $dt =~ /^ (?:
2204     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2205   ) $/ix;
2206 }
2207
2208
2209 =head2 create_ddl_dir (EXPERIMENTAL)
2210
2211 =over 4
2212
2213 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2214
2215 =back
2216
2217 Creates a SQL file based on the Schema, for each of the specified
2218 database engines in C<\@databases> in the given directory.
2219 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2220
2221 Given a previous version number, this will also create a file containing
2222 the ALTER TABLE statements to transform the previous schema into the
2223 current one. Note that these statements may contain C<DROP TABLE> or
2224 C<DROP COLUMN> statements that can potentially destroy data.
2225
2226 The file names are created using the C<ddl_filename> method below, please
2227 override this method in your schema if you would like a different file
2228 name format. For the ALTER file, the same format is used, replacing
2229 $version in the name with "$preversion-$version".
2230
2231 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2232 The most common value for this would be C<< { add_drop_table => 1 } >>
2233 to have the SQL produced include a C<DROP TABLE> statement for each table
2234 created. For quoting purposes supply C<quote_table_names> and
2235 C<quote_field_names>.
2236
2237 If no arguments are passed, then the following default values are assumed:
2238
2239 =over 4
2240
2241 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2242
2243 =item version    - $schema->schema_version
2244
2245 =item directory  - './'
2246
2247 =item preversion - <none>
2248
2249 =back
2250
2251 By default, C<\%sqlt_args> will have
2252
2253  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2254
2255 merged with the hash passed in. To disable any of those features, pass in a
2256 hashref like the following
2257
2258  { ignore_constraint_names => 0, # ... other options }
2259
2260
2261 Note that this feature is currently EXPERIMENTAL and may not work correctly
2262 across all databases, or fully handle complex relationships.
2263
2264 WARNING: Please check all SQL files created, before applying them.
2265
2266 =cut
2267
2268 sub create_ddl_dir {
2269   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2270
2271   if(!$dir || !-d $dir) {
2272     carp "No directory given, using ./\n";
2273     $dir = "./";
2274   }
2275   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2276   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2277
2278   my $schema_version = $schema->schema_version || '1.x';
2279   $version ||= $schema_version;
2280
2281   $sqltargs = {
2282     add_drop_table => 1,
2283     ignore_constraint_names => 1,
2284     ignore_index_names => 1,
2285     %{$sqltargs || {}}
2286   };
2287
2288   $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
2289     if !$self->_sqlt_version_ok;
2290
2291   my $sqlt = SQL::Translator->new( $sqltargs );
2292
2293   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2294   my $sqlt_schema = $sqlt->translate({ data => $schema })
2295     or $self->throw_exception ($sqlt->error);
2296
2297   foreach my $db (@$databases) {
2298     $sqlt->reset();
2299     $sqlt->{schema} = $sqlt_schema;
2300     $sqlt->producer($db);
2301
2302     my $file;
2303     my $filename = $schema->ddl_filename($db, $version, $dir);
2304     if (-e $filename && ($version eq $schema_version )) {
2305       # if we are dumping the current version, overwrite the DDL
2306       carp "Overwriting existing DDL file - $filename";
2307       unlink($filename);
2308     }
2309
2310     my $output = $sqlt->translate;
2311     if(!$output) {
2312       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2313       next;
2314     }
2315     if(!open($file, ">$filename")) {
2316       $self->throw_exception("Can't open $filename for writing ($!)");
2317       next;
2318     }
2319     print $file $output;
2320     close($file);
2321
2322     next unless ($preversion);
2323
2324     require SQL::Translator::Diff;
2325
2326     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2327     if(!-e $prefilename) {
2328       carp("No previous schema file found ($prefilename)");
2329       next;
2330     }
2331
2332     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2333     if(-e $difffile) {
2334       carp("Overwriting existing diff file - $difffile");
2335       unlink($difffile);
2336     }
2337
2338     my $source_schema;
2339     {
2340       my $t = SQL::Translator->new($sqltargs);
2341       $t->debug( 0 );
2342       $t->trace( 0 );
2343
2344       $t->parser( $db )
2345         or $self->throw_exception ($t->error);
2346
2347       my $out = $t->translate( $prefilename )
2348         or $self->throw_exception ($t->error);
2349
2350       $source_schema = $t->schema;
2351
2352       $source_schema->name( $prefilename )
2353         unless ( $source_schema->name );
2354     }
2355
2356     # The "new" style of producers have sane normalization and can support
2357     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2358     # And we have to diff parsed SQL against parsed SQL.
2359     my $dest_schema = $sqlt_schema;
2360
2361     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2362       my $t = SQL::Translator->new($sqltargs);
2363       $t->debug( 0 );
2364       $t->trace( 0 );
2365
2366       $t->parser( $db )
2367         or $self->throw_exception ($t->error);
2368
2369       my $out = $t->translate( $filename )
2370         or $self->throw_exception ($t->error);
2371
2372       $dest_schema = $t->schema;
2373
2374       $dest_schema->name( $filename )
2375         unless $dest_schema->name;
2376     }
2377
2378     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2379                                                   $dest_schema,   $db,
2380                                                   $sqltargs
2381                                                  );
2382     if(!open $file, ">$difffile") {
2383       $self->throw_exception("Can't write to $difffile ($!)");
2384       next;
2385     }
2386     print $file $diff;
2387     close($file);
2388   }
2389 }
2390
2391 =head2 deployment_statements
2392
2393 =over 4
2394
2395 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2396
2397 =back
2398
2399 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2400
2401 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2402 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2403
2404 C<$directory> is used to return statements from files in a previously created
2405 L</create_ddl_dir> directory and is optional. The filenames are constructed
2406 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2407
2408 If no C<$directory> is specified then the statements are constructed on the
2409 fly using L<SQL::Translator> and C<$version> is ignored.
2410
2411 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2412
2413 =cut
2414
2415 sub deployment_statements {
2416   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2417   $type ||= $self->sqlt_type;
2418   $version ||= $schema->schema_version || '1.x';
2419   $dir ||= './';
2420   my $filename = $schema->ddl_filename($type, $version, $dir);
2421   if(-f $filename)
2422   {
2423       my $file;
2424       open($file, "<$filename")
2425         or $self->throw_exception("Can't open $filename ($!)");
2426       my @rows = <$file>;
2427       close($file);
2428       return join('', @rows);
2429   }
2430
2431   $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
2432     if !$self->_sqlt_version_ok;
2433
2434   # sources needs to be a parser arg, but for simplicty allow at top level
2435   # coming in
2436   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2437       if exists $sqltargs->{sources};
2438
2439   my $tr = SQL::Translator->new(
2440     producer => "SQL::Translator::Producer::${type}",
2441     %$sqltargs,
2442     parser => 'SQL::Translator::Parser::DBIx::Class',
2443     data => $schema,
2444   );
2445   return $tr->translate;
2446 }
2447
2448 sub deploy {
2449   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2450   my $deploy = sub {
2451     my $line = shift;
2452     return if($line =~ /^--/);
2453     return if(!$line);
2454     # next if($line =~ /^DROP/m);
2455     return if($line =~ /^BEGIN TRANSACTION/m);
2456     return if($line =~ /^COMMIT/m);
2457     return if $line =~ /^\s+$/; # skip whitespace only
2458     $self->_query_start($line);
2459     eval {
2460       # do a dbh_do cycle here, as we need some error checking in
2461       # place (even though we will ignore errors)
2462       $self->dbh_do (sub { $_[1]->do($line) });
2463     };
2464     if ($@) {
2465       carp qq{$@ (running "${line}")};
2466     }
2467     $self->_query_end($line);
2468   };
2469   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2470   if (@statements > 1) {
2471     foreach my $statement (@statements) {
2472       $deploy->( $statement );
2473     }
2474   }
2475   elsif (@statements == 1) {
2476     foreach my $line ( split(";\n", $statements[0])) {
2477       $deploy->( $line );
2478     }
2479   }
2480 }
2481
2482 =head2 datetime_parser
2483
2484 Returns the datetime parser class
2485
2486 =cut
2487
2488 sub datetime_parser {
2489   my $self = shift;
2490   return $self->{datetime_parser} ||= do {
2491     $self->build_datetime_parser(@_);
2492   };
2493 }
2494
2495 =head2 datetime_parser_type
2496
2497 Defines (returns) the datetime parser class - currently hardwired to
2498 L<DateTime::Format::MySQL>
2499
2500 =cut
2501
2502 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2503
2504 =head2 build_datetime_parser
2505
2506 See L</datetime_parser>
2507
2508 =cut
2509
2510 sub build_datetime_parser {
2511   if (not $_[0]->_driver_determined) {
2512     $_[0]->_determine_driver;
2513     goto $_[0]->can('build_datetime_parser');
2514   }
2515
2516   my $self = shift;
2517   my $type = $self->datetime_parser_type(@_);
2518   $self->ensure_class_loaded ($type);
2519   return $type;
2520 }
2521
2522
2523 =head2 is_replicating
2524
2525 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2526 replicate from a master database.  Default is undef, which is the result
2527 returned by databases that don't support replication.
2528
2529 =cut
2530
2531 sub is_replicating {
2532     return;
2533
2534 }
2535
2536 =head2 lag_behind_master
2537
2538 Returns a number that represents a certain amount of lag behind a master db
2539 when a given storage is replicating.  The number is database dependent, but
2540 starts at zero and increases with the amount of lag. Default in undef
2541
2542 =cut
2543
2544 sub lag_behind_master {
2545     return;
2546 }
2547
2548 sub DESTROY {
2549   my $self = shift;
2550
2551   $self->_verify_pid if $self->_dbh;
2552
2553   # some databases need this to stop spewing warnings
2554   if (my $dbh = $self->_dbh) {
2555     local $@;
2556     eval { $dbh->disconnect };
2557   }
2558
2559   $self->_dbh(undef);
2560 }
2561
2562 1;
2563
2564 =head1 USAGE NOTES
2565
2566 =head2 DBIx::Class and AutoCommit
2567
2568 DBIx::Class can do some wonderful magic with handling exceptions,
2569 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2570 (the default) combined with C<txn_do> for transaction support.
2571
2572 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2573 in an assumed transaction between commits, and you're telling us you'd
2574 like to manage that manually.  A lot of the magic protections offered by
2575 this module will go away.  We can't protect you from exceptions due to database
2576 disconnects because we don't know anything about how to restart your
2577 transactions.  You're on your own for handling all sorts of exceptional
2578 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2579 be with raw DBI.
2580
2581
2582 =head1 AUTHORS
2583
2584 Matt S. Trout <mst@shadowcatsystems.co.uk>
2585
2586 Andy Grundman <andy@hybridized.org>
2587
2588 =head1 LICENSE
2589
2590 You may distribute this code under the same terms as Perl itself.
2591
2592 =cut