support coderef connect_infos for repicated storage
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16
17 __PACKAGE__->mk_group_accessors('simple' =>
18   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
19      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
26   disable_sth_caching unsafe auto_savepoint
27 /;
28 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
29
30
31 # default cursor class, overridable in connect_info attributes
32 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
33
34 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
35 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
36
37
38 =head1 NAME
39
40 DBIx::Class::Storage::DBI - DBI storage handler
41
42 =head1 SYNOPSIS
43
44   my $schema = MySchema->connect('dbi:SQLite:my.db');
45
46   $schema->storage->debug(1);
47   $schema->dbh_do("DROP TABLE authors");
48
49   $schema->resultset('Book')->search({
50      written_on => $schema->storage->datetime_parser(DateTime->now)
51   });
52
53 =head1 DESCRIPTION
54
55 This class represents the connection to an RDBMS via L<DBI>.  See
56 L<DBIx::Class::Storage> for general information.  This pod only
57 documents DBI-specific methods and behaviors.
58
59 =head1 METHODS
60
61 =cut
62
63 sub new {
64   my $new = shift->next::method(@_);
65
66   $new->transaction_depth(0);
67   $new->_sql_maker_opts({});
68   $new->{savepoints} = [];
69   $new->{_in_dbh_do} = 0;
70   $new->{_dbh_gen} = 0;
71
72   $new;
73 }
74
75 =head2 connect_info
76
77 This method is normally called by L<DBIx::Class::Schema/connection>, which
78 encapsulates its argument list in an arrayref before passing them here.
79
80 The argument list may contain:
81
82 =over
83
84 =item *
85
86 The same 4-element argument set one would normally pass to
87 L<DBI/connect>, optionally followed by
88 L<extra attributes|/DBIx::Class specific connection attributes>
89 recognized by DBIx::Class:
90
91   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
92
93 =item *
94
95 A single code reference which returns a connected
96 L<DBI database handle|DBI/connect> optionally followed by
97 L<extra attributes|/DBIx::Class specific connection attributes> recognized
98 by DBIx::Class:
99
100   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
101
102 =item *
103
104 A single hashref with all the attributes and the dsn/user/password
105 mixed together:
106
107   $connect_info_args = [{
108     dsn => $dsn,
109     user => $user,
110     password => $pass,
111     %dbi_attributes,
112     %extra_attributes,
113   }];
114
115   $connect_info_args = [{
116     dbh_maker => sub { DBI->connect (...) },
117     %dbi_attributes,
118     %extra_attributes,
119   }];
120
121 This is particularly useful for L<Catalyst> based applications, allowing the
122 following config (L<Config::General> style):
123
124   <Model::DB>
125     schema_class   App::DB
126     <connect_info>
127       dsn          dbi:mysql:database=test
128       user         testuser
129       password     TestPass
130       AutoCommit   1
131     </connect_info>
132   </Model::DB>
133
134 The C<dsn>/C<user>/C<password> combination can be substituted by the
135 C<dbh_maker> key whose value is a coderef that returns a connected
136 L<DBI database handle|DBI/connect>
137
138 =back
139
140 Please note that the L<DBI> docs recommend that you always explicitly
141 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
142 recommends that it be set to I<1>, and that you perform transactions
143 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
144 to I<1> if you do not do explicitly set it to zero.  This is the default
145 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
146
147 =head3 DBIx::Class specific connection attributes
148
149 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
150 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
151 the following connection options. These options can be mixed in with your other
152 L<DBI> connection attributes, or placed in a seperate hashref
153 (C<\%extra_attributes>) as shown above.
154
155 Every time C<connect_info> is invoked, any previous settings for
156 these options will be cleared before setting the new ones, regardless of
157 whether any options are specified in the new C<connect_info>.
158
159
160 =over
161
162 =item on_connect_do
163
164 Specifies things to do immediately after connecting or re-connecting to
165 the database.  Its value may contain:
166
167 =over
168
169 =item a scalar
170
171 This contains one SQL statement to execute.
172
173 =item an array reference
174
175 This contains SQL statements to execute in order.  Each element contains
176 a string or a code reference that returns a string.
177
178 =item a code reference
179
180 This contains some code to execute.  Unlike code references within an
181 array reference, its return value is ignored.
182
183 =back
184
185 =item on_disconnect_do
186
187 Takes arguments in the same form as L</on_connect_do> and executes them
188 immediately before disconnecting from the database.
189
190 Note, this only runs if you explicitly call L</disconnect> on the
191 storage object.
192
193 =item on_connect_call
194
195 A more generalized form of L</on_connect_do> that calls the specified
196 C<connect_call_METHOD> methods in your storage driver.
197
198   on_connect_do => 'select 1'
199
200 is equivalent to:
201
202   on_connect_call => [ [ do_sql => 'select 1' ] ]
203
204 Its values may contain:
205
206 =over
207
208 =item a scalar
209
210 Will call the C<connect_call_METHOD> method.
211
212 =item a code reference
213
214 Will execute C<< $code->($storage) >>
215
216 =item an array reference
217
218 Each value can be a method name or code reference.
219
220 =item an array of arrays
221
222 For each array, the first item is taken to be the C<connect_call_> method name
223 or code reference, and the rest are parameters to it.
224
225 =back
226
227 Some predefined storage methods you may use:
228
229 =over
230
231 =item do_sql
232
233 Executes a SQL string or a code reference that returns a SQL string. This is
234 what L</on_connect_do> and L</on_disconnect_do> use.
235
236 It can take:
237
238 =over
239
240 =item a scalar
241
242 Will execute the scalar as SQL.
243
244 =item an arrayref
245
246 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
247 attributes hashref and bind values.
248
249 =item a code reference
250
251 Will execute C<< $code->($storage) >> and execute the return array refs as
252 above.
253
254 =back
255
256 =item datetime_setup
257
258 Execute any statements necessary to initialize the database session to return
259 and accept datetime/timestamp values used with
260 L<DBIx::Class::InflateColumn::DateTime>.
261
262 Only necessary for some databases, see your specific storage driver for
263 implementation details.
264
265 =back
266
267 =item on_disconnect_call
268
269 Takes arguments in the same form as L</on_connect_call> and executes them
270 immediately before disconnecting from the database.
271
272 Calls the C<disconnect_call_METHOD> methods as opposed to the
273 C<connect_call_METHOD> methods called by L</on_connect_call>.
274
275 Note, this only runs if you explicitly call L</disconnect> on the
276 storage object.
277
278 =item disable_sth_caching
279
280 If set to a true value, this option will disable the caching of
281 statement handles via L<DBI/prepare_cached>.
282
283 =item limit_dialect
284
285 Sets the limit dialect. This is useful for JDBC-bridge among others
286 where the remote SQL-dialect cannot be determined by the name of the
287 driver alone. See also L<SQL::Abstract::Limit>.
288
289 =item quote_char
290
291 Specifies what characters to use to quote table and column names. If
292 you use this you will want to specify L</name_sep> as well.
293
294 C<quote_char> expects either a single character, in which case is it
295 is placed on either side of the table/column name, or an arrayref of length
296 2 in which case the table/column name is placed between the elements.
297
298 For example under MySQL you should use C<< quote_char => '`' >>, and for
299 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
300
301 =item name_sep
302
303 This only needs to be used in conjunction with C<quote_char>, and is used to
304 specify the charecter that seperates elements (schemas, tables, columns) from
305 each other. In most cases this is simply a C<.>.
306
307 The consequences of not supplying this value is that L<SQL::Abstract>
308 will assume DBIx::Class' uses of aliases to be complete column
309 names. The output will look like I<"me.name"> when it should actually
310 be I<"me"."name">.
311
312 =item unsafe
313
314 This Storage driver normally installs its own C<HandleError>, sets
315 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
316 all database handles, including those supplied by a coderef.  It does this
317 so that it can have consistent and useful error behavior.
318
319 If you set this option to a true value, Storage will not do its usual
320 modifications to the database handle's attributes, and instead relies on
321 the settings in your connect_info DBI options (or the values you set in
322 your connection coderef, in the case that you are connecting via coderef).
323
324 Note that your custom settings can cause Storage to malfunction,
325 especially if you set a C<HandleError> handler that suppresses exceptions
326 and/or disable C<RaiseError>.
327
328 =item auto_savepoint
329
330 If this option is true, L<DBIx::Class> will use savepoints when nesting
331 transactions, making it possible to recover from failure in the inner
332 transaction without having to abort all outer transactions.
333
334 =item cursor_class
335
336 Use this argument to supply a cursor class other than the default
337 L<DBIx::Class::Storage::DBI::Cursor>.
338
339 =back
340
341 Some real-life examples of arguments to L</connect_info> and
342 L<DBIx::Class::Schema/connect>
343
344   # Simple SQLite connection
345   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
346
347   # Connect via subref
348   ->connect_info([ sub { DBI->connect(...) } ]);
349
350   # Connect via subref in hashref
351   ->connect_info([{
352     dbh_maker => sub { DBI->connect(...) },
353     on_connect_do => 'alter session ...',
354   }]);
355
356   # A bit more complicated
357   ->connect_info(
358     [
359       'dbi:Pg:dbname=foo',
360       'postgres',
361       'my_pg_password',
362       { AutoCommit => 1 },
363       { quote_char => q{"}, name_sep => q{.} },
364     ]
365   );
366
367   # Equivalent to the previous example
368   ->connect_info(
369     [
370       'dbi:Pg:dbname=foo',
371       'postgres',
372       'my_pg_password',
373       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
374     ]
375   );
376
377   # Same, but with hashref as argument
378   # See parse_connect_info for explanation
379   ->connect_info(
380     [{
381       dsn         => 'dbi:Pg:dbname=foo',
382       user        => 'postgres',
383       password    => 'my_pg_password',
384       AutoCommit  => 1,
385       quote_char  => q{"},
386       name_sep    => q{.},
387     }]
388   );
389
390   # Subref + DBIx::Class-specific connection options
391   ->connect_info(
392     [
393       sub { DBI->connect(...) },
394       {
395           quote_char => q{`},
396           name_sep => q{@},
397           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
398           disable_sth_caching => 1,
399       },
400     ]
401   );
402
403
404
405 =cut
406
407 sub connect_info {
408   my ($self, $info_arg) = @_;
409
410   return $self->_connect_info if !$info_arg;
411
412   my @args = @$info_arg;  # take a shallow copy for further mutilation
413   $self->_connect_info([@args]); # copy for _connect_info
414
415
416   # combine/pre-parse arguments depending on invocation style
417
418   my %attrs;
419   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
420     %attrs = %{ $args[1] || {} };
421     @args = $args[0];
422   }
423   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
424     %attrs = %{$args[0]};
425     @args = ();
426     if (my $code = delete $attrs{dbh_maker}) {
427       @args = $code;
428
429       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
430       if (@ignored) {
431         carp sprintf (
432             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
433           . "to the result of 'dbh_maker'",
434
435           join (', ', map { "'$_'" } (@ignored) ),
436         );
437       }
438     }
439     else {
440       @args = delete @attrs{qw/dsn user password/};
441     }
442   }
443   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
444     %attrs = (
445       % { $args[3] || {} },
446       % { $args[4] || {} },
447     );
448     @args = @args[0,1,2];
449   }
450
451   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
452   #  the new set of options
453   $self->_sql_maker(undef);
454   $self->_sql_maker_opts({});
455
456   if(keys %attrs) {
457     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
458       if(my $value = delete $attrs{$storage_opt}) {
459         $self->$storage_opt($value);
460       }
461     }
462     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
463       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
464         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
465       }
466     }
467   }
468
469   if (ref $args[0] eq 'CODE') {
470     # _connect() never looks past $args[0] in this case
471     %attrs = ()
472   } else {
473     %attrs = (
474       %{ $self->_default_dbi_connect_attributes || {} },
475       %attrs,
476     );
477   }
478
479   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
480   $self->_connect_info;
481 }
482
483 sub _default_dbi_connect_attributes {
484   return {
485     AutoCommit => 1,
486     RaiseError => 1,
487     PrintError => 0,
488   };
489 }
490
491 =head2 on_connect_do
492
493 This method is deprecated in favour of setting via L</connect_info>.
494
495 =cut
496
497 =head2 on_disconnect_do
498
499 This method is deprecated in favour of setting via L</connect_info>.
500
501 =cut
502
503 sub _parse_connect_do {
504   my ($self, $type) = @_;
505
506   my $val = $self->$type;
507   return () if not defined $val;
508
509   my @res;
510
511   if (not ref($val)) {
512     push @res, [ 'do_sql', $val ];
513   } elsif (ref($val) eq 'CODE') {
514     push @res, $val;
515   } elsif (ref($val) eq 'ARRAY') {
516     push @res, map { [ 'do_sql', $_ ] } @$val;
517   } else {
518     $self->throw_exception("Invalid type for $type: ".ref($val));
519   }
520
521   return \@res;
522 }
523
524 =head2 dbh_do
525
526 Arguments: ($subref | $method_name), @extra_coderef_args?
527
528 Execute the given $subref or $method_name using the new exception-based
529 connection management.
530
531 The first two arguments will be the storage object that C<dbh_do> was called
532 on and a database handle to use.  Any additional arguments will be passed
533 verbatim to the called subref as arguments 2 and onwards.
534
535 Using this (instead of $self->_dbh or $self->dbh) ensures correct
536 exception handling and reconnection (or failover in future subclasses).
537
538 Your subref should have no side-effects outside of the database, as
539 there is the potential for your subref to be partially double-executed
540 if the database connection was stale/dysfunctional.
541
542 Example:
543
544   my @stuff = $schema->storage->dbh_do(
545     sub {
546       my ($storage, $dbh, @cols) = @_;
547       my $cols = join(q{, }, @cols);
548       $dbh->selectrow_array("SELECT $cols FROM foo");
549     },
550     @column_list
551   );
552
553 =cut
554
555 sub dbh_do {
556   my $self = shift;
557   my $code = shift;
558
559   my $dbh = $self->_dbh;
560
561   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
562       || $self->{transaction_depth};
563
564   local $self->{_in_dbh_do} = 1;
565
566   my @result;
567   my $want_array = wantarray;
568
569   eval {
570     $self->_verify_pid if $dbh;
571     if(!$self->_dbh) {
572         $self->_populate_dbh;
573         $dbh = $self->_dbh;
574     }
575
576     if($want_array) {
577         @result = $self->$code($dbh, @_);
578     }
579     elsif(defined $want_array) {
580         $result[0] = $self->$code($dbh, @_);
581     }
582     else {
583         $self->$code($dbh, @_);
584     }
585   };
586
587   # ->connected might unset $@ - copy
588   my $exception = $@;
589   if(!$exception) { return $want_array ? @result : $result[0] }
590
591   $self->throw_exception($exception) if $self->connected;
592
593   # We were not connected - reconnect and retry, but let any
594   #  exception fall right through this time
595   carp "Retrying $code after catching disconnected exception: $exception"
596     if $ENV{DBIC_DBIRETRY_DEBUG};
597   $self->_populate_dbh;
598   $self->$code($self->_dbh, @_);
599 }
600
601 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
602 # It also informs dbh_do to bypass itself while under the direction of txn_do,
603 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
604 sub txn_do {
605   my $self = shift;
606   my $coderef = shift;
607
608   ref $coderef eq 'CODE' or $self->throw_exception
609     ('$coderef must be a CODE reference');
610
611   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
612
613   local $self->{_in_dbh_do} = 1;
614
615   my @result;
616   my $want_array = wantarray;
617
618   my $tried = 0;
619   while(1) {
620     eval {
621       $self->_verify_pid if $self->_dbh;
622       $self->_populate_dbh if !$self->_dbh;
623
624       $self->txn_begin;
625       if($want_array) {
626           @result = $coderef->(@_);
627       }
628       elsif(defined $want_array) {
629           $result[0] = $coderef->(@_);
630       }
631       else {
632           $coderef->(@_);
633       }
634       $self->txn_commit;
635     };
636
637     # ->connected might unset $@ - copy
638     my $exception = $@;
639     if(!$exception) { return $want_array ? @result : $result[0] }
640
641     if($tried++ || $self->connected) {
642       eval { $self->txn_rollback };
643       my $rollback_exception = $@;
644       if($rollback_exception) {
645         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
646         $self->throw_exception($exception)  # propagate nested rollback
647           if $rollback_exception =~ /$exception_class/;
648
649         $self->throw_exception(
650           "Transaction aborted: ${exception}. "
651           . "Rollback failed: ${rollback_exception}"
652         );
653       }
654       $self->throw_exception($exception)
655     }
656
657     # We were not connected, and was first try - reconnect and retry
658     # via the while loop
659     carp "Retrying $coderef after catching disconnected exception: $exception"
660       if $ENV{DBIC_DBIRETRY_DEBUG};
661     $self->_populate_dbh;
662   }
663 }
664
665 =head2 disconnect
666
667 Our C<disconnect> method also performs a rollback first if the
668 database is not in C<AutoCommit> mode.
669
670 =cut
671
672 sub disconnect {
673   my ($self) = @_;
674
675   if( $self->_dbh ) {
676     my @actions;
677
678     push @actions, ( $self->on_disconnect_call || () );
679     push @actions, $self->_parse_connect_do ('on_disconnect_do');
680
681     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
682
683     $self->_dbh->rollback unless $self->_dbh_autocommit;
684     $self->_dbh->disconnect;
685     $self->_dbh(undef);
686     $self->{_dbh_gen}++;
687   }
688 }
689
690 =head2 with_deferred_fk_checks
691
692 =over 4
693
694 =item Arguments: C<$coderef>
695
696 =item Return Value: The return value of $coderef
697
698 =back
699
700 Storage specific method to run the code ref with FK checks deferred or
701 in MySQL's case disabled entirely.
702
703 =cut
704
705 # Storage subclasses should override this
706 sub with_deferred_fk_checks {
707   my ($self, $sub) = @_;
708
709   $sub->();
710 }
711
712 =head2 connected
713
714 =over
715
716 =item Arguments: none
717
718 =item Return Value: 1|0
719
720 =back
721
722 Verifies that the the current database handle is active and ready to execute
723 an SQL statement (i.e. the connection did not get stale, server is still
724 answering, etc.) This method is used internally by L</dbh>.
725
726 =cut
727
728 sub connected {
729   my $self = shift;
730   return 0 unless $self->_seems_connected;
731
732   #be on the safe side
733   local $self->_dbh->{RaiseError} = 1;
734
735   return $self->_ping;
736 }
737
738 sub _seems_connected {
739   my $self = shift;
740
741   my $dbh = $self->_dbh
742     or return 0;
743
744   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
745     $self->_dbh(undef);
746     $self->{_dbh_gen}++;
747     return 0;
748   }
749   else {
750     $self->_verify_pid;
751     return 0 if !$self->_dbh;
752   }
753
754   return $dbh->FETCH('Active');
755 }
756
757 sub _ping {
758   my $self = shift;
759
760   my $dbh = $self->_dbh or return 0;
761
762   return $dbh->ping;
763 }
764
765 # handle pid changes correctly
766 #  NOTE: assumes $self->_dbh is a valid $dbh
767 sub _verify_pid {
768   my ($self) = @_;
769
770   return if defined $self->_conn_pid && $self->_conn_pid == $$;
771
772   $self->_dbh->{InactiveDestroy} = 1;
773   $self->_dbh(undef);
774   $self->{_dbh_gen}++;
775
776   return;
777 }
778
779 sub ensure_connected {
780   my ($self) = @_;
781
782   unless ($self->connected) {
783     $self->_populate_dbh;
784   }
785 }
786
787 =head2 dbh
788
789 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
790 is guaranteed to be healthy by implicitly calling L</connected>, and if
791 necessary performing a reconnection before returning. Keep in mind that this
792 is very B<expensive> on some database engines. Consider using L<dbh_do>
793 instead.
794
795 =cut
796
797 sub dbh {
798   my ($self) = @_;
799
800   if (not $self->_dbh) {
801     $self->_populate_dbh;
802   } else {
803     $self->ensure_connected;
804   }
805   return $self->_dbh;
806 }
807
808 # this is the internal "get dbh or connect (don't check)" method
809 sub _get_dbh {
810   my $self = shift;
811   $self->_populate_dbh unless $self->_dbh;
812   return $self->_dbh;
813 }
814
815 sub _sql_maker_args {
816     my ($self) = @_;
817
818     return (
819       bindtype=>'columns',
820       array_datatypes => 1,
821       limit_dialect => $self->_get_dbh,
822       %{$self->_sql_maker_opts}
823     );
824 }
825
826 sub sql_maker {
827   my ($self) = @_;
828   unless ($self->_sql_maker) {
829     my $sql_maker_class = $self->sql_maker_class;
830     $self->ensure_class_loaded ($sql_maker_class);
831     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
832   }
833   return $self->_sql_maker;
834 }
835
836 sub _rebless {}
837
838 sub _populate_dbh {
839   my ($self) = @_;
840
841   my @info = @{$self->_dbi_connect_info || []};
842   $self->_dbh(undef); # in case ->connected failed we might get sent here
843   $self->_dbh($self->_connect(@info));
844
845   $self->_conn_pid($$);
846   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
847
848   $self->_determine_driver;
849
850   # Always set the transaction depth on connect, since
851   #  there is no transaction in progress by definition
852   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
853
854   $self->_run_connection_actions unless $self->{_in_determine_driver};
855 }
856
857 sub _run_connection_actions {
858   my $self = shift;
859   my @actions;
860
861   push @actions, ( $self->on_connect_call || () );
862   push @actions, $self->_parse_connect_do ('on_connect_do');
863
864   $self->_do_connection_actions(connect_call_ => $_) for @actions;
865 }
866
867 sub _determine_driver {
868   my ($self) = @_;
869
870   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
871     my $started_unconnected = 0;
872     local $self->{_in_determine_driver} = 1;
873
874     if (ref($self) eq __PACKAGE__) {
875       my $driver;
876       if ($self->_dbh) { # we are connected
877         $driver = $self->_dbh->{Driver}{Name};
878       } else {
879         # if connect_info is a CODEREF, we have no choice but to connect
880         if (ref $self->_dbi_connect_info->[0] &&
881             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
882           $self->_populate_dbh;
883           $driver = $self->_dbh->{Driver}{Name};
884         }
885         else {
886           # try to use dsn to not require being connected, the driver may still
887           # force a connection in _rebless to determine version
888           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
889           $started_unconnected = 1;
890         }
891       }
892
893       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
894       if ($self->load_optional_class($storage_class)) {
895         mro::set_mro($storage_class, 'c3');
896         bless $self, $storage_class;
897         $self->_rebless();
898       }
899     }
900
901     $self->_driver_determined(1);
902
903     $self->_run_connection_actions
904         if $started_unconnected && defined $self->_dbh;
905   }
906 }
907
908 sub _do_connection_actions {
909   my $self          = shift;
910   my $method_prefix = shift;
911   my $call          = shift;
912
913   if (not ref($call)) {
914     my $method = $method_prefix . $call;
915     $self->$method(@_);
916   } elsif (ref($call) eq 'CODE') {
917     $self->$call(@_);
918   } elsif (ref($call) eq 'ARRAY') {
919     if (ref($call->[0]) ne 'ARRAY') {
920       $self->_do_connection_actions($method_prefix, $_) for @$call;
921     } else {
922       $self->_do_connection_actions($method_prefix, @$_) for @$call;
923     }
924   } else {
925     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
926   }
927
928   return $self;
929 }
930
931 sub connect_call_do_sql {
932   my $self = shift;
933   $self->_do_query(@_);
934 }
935
936 sub disconnect_call_do_sql {
937   my $self = shift;
938   $self->_do_query(@_);
939 }
940
941 # override in db-specific backend when necessary
942 sub connect_call_datetime_setup { 1 }
943
944 sub _do_query {
945   my ($self, $action) = @_;
946
947   if (ref $action eq 'CODE') {
948     $action = $action->($self);
949     $self->_do_query($_) foreach @$action;
950   }
951   else {
952     # Most debuggers expect ($sql, @bind), so we need to exclude
953     # the attribute hash which is the second argument to $dbh->do
954     # furthermore the bind values are usually to be presented
955     # as named arrayref pairs, so wrap those here too
956     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
957     my $sql = shift @do_args;
958     my $attrs = shift @do_args;
959     my @bind = map { [ undef, $_ ] } @do_args;
960
961     $self->_query_start($sql, @bind);
962     $self->_dbh->do($sql, $attrs, @do_args);
963     $self->_query_end($sql, @bind);
964   }
965
966   return $self;
967 }
968
969 sub _connect {
970   my ($self, @info) = @_;
971
972   $self->throw_exception("You failed to provide any connection info")
973     if !@info;
974
975   my ($old_connect_via, $dbh);
976
977   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
978     $old_connect_via = $DBI::connect_via;
979     $DBI::connect_via = 'connect';
980   }
981
982   eval {
983     if(ref $info[0] eq 'CODE') {
984        $dbh = &{$info[0]}
985     }
986     else {
987        $dbh = DBI->connect(@info);
988     }
989
990     if($dbh && !$self->unsafe) {
991       my $weak_self = $self;
992       Scalar::Util::weaken($weak_self);
993       $dbh->{HandleError} = sub {
994           if ($weak_self) {
995             $weak_self->throw_exception("DBI Exception: $_[0]");
996           }
997           else {
998             croak ("DBI Exception: $_[0]");
999           }
1000       };
1001       $dbh->{ShowErrorStatement} = 1;
1002       $dbh->{RaiseError} = 1;
1003       $dbh->{PrintError} = 0;
1004     }
1005   };
1006
1007   $DBI::connect_via = $old_connect_via if $old_connect_via;
1008
1009   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1010     if !$dbh || $@;
1011
1012   $self->_dbh_autocommit($dbh->{AutoCommit});
1013
1014   $dbh;
1015 }
1016
1017 sub svp_begin {
1018   my ($self, $name) = @_;
1019
1020   $name = $self->_svp_generate_name
1021     unless defined $name;
1022
1023   $self->throw_exception ("You can't use savepoints outside a transaction")
1024     if $self->{transaction_depth} == 0;
1025
1026   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1027     unless $self->can('_svp_begin');
1028
1029   push @{ $self->{savepoints} }, $name;
1030
1031   $self->debugobj->svp_begin($name) if $self->debug;
1032
1033   return $self->_svp_begin($name);
1034 }
1035
1036 sub svp_release {
1037   my ($self, $name) = @_;
1038
1039   $self->throw_exception ("You can't use savepoints outside a transaction")
1040     if $self->{transaction_depth} == 0;
1041
1042   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1043     unless $self->can('_svp_release');
1044
1045   if (defined $name) {
1046     $self->throw_exception ("Savepoint '$name' does not exist")
1047       unless grep { $_ eq $name } @{ $self->{savepoints} };
1048
1049     # Dig through the stack until we find the one we are releasing.  This keeps
1050     # the stack up to date.
1051     my $svp;
1052
1053     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1054   } else {
1055     $name = pop @{ $self->{savepoints} };
1056   }
1057
1058   $self->debugobj->svp_release($name) if $self->debug;
1059
1060   return $self->_svp_release($name);
1061 }
1062
1063 sub svp_rollback {
1064   my ($self, $name) = @_;
1065
1066   $self->throw_exception ("You can't use savepoints outside a transaction")
1067     if $self->{transaction_depth} == 0;
1068
1069   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1070     unless $self->can('_svp_rollback');
1071
1072   if (defined $name) {
1073       # If they passed us a name, verify that it exists in the stack
1074       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1075           $self->throw_exception("Savepoint '$name' does not exist!");
1076       }
1077
1078       # Dig through the stack until we find the one we are releasing.  This keeps
1079       # the stack up to date.
1080       while(my $s = pop(@{ $self->{savepoints} })) {
1081           last if($s eq $name);
1082       }
1083       # Add the savepoint back to the stack, as a rollback doesn't remove the
1084       # named savepoint, only everything after it.
1085       push(@{ $self->{savepoints} }, $name);
1086   } else {
1087       # We'll assume they want to rollback to the last savepoint
1088       $name = $self->{savepoints}->[-1];
1089   }
1090
1091   $self->debugobj->svp_rollback($name) if $self->debug;
1092
1093   return $self->_svp_rollback($name);
1094 }
1095
1096 sub _svp_generate_name {
1097     my ($self) = @_;
1098
1099     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1100 }
1101
1102 sub txn_begin {
1103   my $self = shift;
1104   if($self->{transaction_depth} == 0) {
1105     $self->debugobj->txn_begin()
1106       if $self->debug;
1107
1108     # being here implies we have AutoCommit => 1
1109     # if the user is utilizing txn_do - good for
1110     # him, otherwise we need to ensure that the
1111     # $dbh is healthy on BEGIN
1112     my $dbh_method = $self->{_in_dbh_do} ? '_dbh' : 'dbh';
1113     $self->$dbh_method->begin_work;
1114
1115   } elsif ($self->auto_savepoint) {
1116     $self->svp_begin;
1117   }
1118   $self->{transaction_depth}++;
1119 }
1120
1121 sub txn_commit {
1122   my $self = shift;
1123   if ($self->{transaction_depth} == 1) {
1124     my $dbh = $self->_dbh;
1125     $self->debugobj->txn_commit()
1126       if ($self->debug);
1127     $dbh->commit;
1128     $self->{transaction_depth} = 0
1129       if $self->_dbh_autocommit;
1130   }
1131   elsif($self->{transaction_depth} > 1) {
1132     $self->{transaction_depth}--;
1133     $self->svp_release
1134       if $self->auto_savepoint;
1135   }
1136 }
1137
1138 sub txn_rollback {
1139   my $self = shift;
1140   my $dbh = $self->_dbh;
1141   eval {
1142     if ($self->{transaction_depth} == 1) {
1143       $self->debugobj->txn_rollback()
1144         if ($self->debug);
1145       $self->{transaction_depth} = 0
1146         if $self->_dbh_autocommit;
1147       $dbh->rollback;
1148     }
1149     elsif($self->{transaction_depth} > 1) {
1150       $self->{transaction_depth}--;
1151       if ($self->auto_savepoint) {
1152         $self->svp_rollback;
1153         $self->svp_release;
1154       }
1155     }
1156     else {
1157       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1158     }
1159   };
1160   if ($@) {
1161     my $error = $@;
1162     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1163     $error =~ /$exception_class/ and $self->throw_exception($error);
1164     # ensure that a failed rollback resets the transaction depth
1165     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1166     $self->throw_exception($error);
1167   }
1168 }
1169
1170 # This used to be the top-half of _execute.  It was split out to make it
1171 #  easier to override in NoBindVars without duping the rest.  It takes up
1172 #  all of _execute's args, and emits $sql, @bind.
1173 sub _prep_for_execute {
1174   my ($self, $op, $extra_bind, $ident, $args) = @_;
1175
1176   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1177     $ident = $ident->from();
1178   }
1179
1180   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1181
1182   unshift(@bind,
1183     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1184       if $extra_bind;
1185   return ($sql, \@bind);
1186 }
1187
1188
1189 sub _fix_bind_params {
1190     my ($self, @bind) = @_;
1191
1192     ### Turn @bind from something like this:
1193     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1194     ### to this:
1195     ###   ( "'1'", "'1'", "'3'" )
1196     return
1197         map {
1198             if ( defined( $_ && $_->[1] ) ) {
1199                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1200             }
1201             else { q{'NULL'}; }
1202         } @bind;
1203 }
1204
1205 sub _query_start {
1206     my ( $self, $sql, @bind ) = @_;
1207
1208     if ( $self->debug ) {
1209         @bind = $self->_fix_bind_params(@bind);
1210
1211         $self->debugobj->query_start( $sql, @bind );
1212     }
1213 }
1214
1215 sub _query_end {
1216     my ( $self, $sql, @bind ) = @_;
1217
1218     if ( $self->debug ) {
1219         @bind = $self->_fix_bind_params(@bind);
1220         $self->debugobj->query_end( $sql, @bind );
1221     }
1222 }
1223
1224 sub _dbh_execute {
1225   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1226
1227   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1228
1229   $self->_query_start( $sql, @$bind );
1230
1231   my $sth = $self->sth($sql,$op);
1232
1233   my $placeholder_index = 1;
1234
1235   foreach my $bound (@$bind) {
1236     my $attributes = {};
1237     my($column_name, @data) = @$bound;
1238
1239     if ($bind_attributes) {
1240       $attributes = $bind_attributes->{$column_name}
1241       if defined $bind_attributes->{$column_name};
1242     }
1243
1244     foreach my $data (@data) {
1245       my $ref = ref $data;
1246       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1247
1248       $sth->bind_param($placeholder_index, $data, $attributes);
1249       $placeholder_index++;
1250     }
1251   }
1252
1253   # Can this fail without throwing an exception anyways???
1254   my $rv = $sth->execute();
1255   $self->throw_exception($sth->errstr) if !$rv;
1256
1257   $self->_query_end( $sql, @$bind );
1258
1259   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1260 }
1261
1262 sub _execute {
1263     my $self = shift;
1264     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1265 }
1266
1267 sub insert {
1268   my ($self, $source, $to_insert) = @_;
1269
1270 # redispatch to insert method of storage we reblessed into, if necessary
1271   if (not $self->_driver_determined) {
1272     $self->_determine_driver;
1273     goto $self->can('insert');
1274   }
1275
1276   my $ident = $source->from;
1277   my $bind_attributes = $self->source_bind_attributes($source);
1278
1279   my $updated_cols = {};
1280
1281   foreach my $col ( $source->columns ) {
1282     if ( !defined $to_insert->{$col} ) {
1283       my $col_info = $source->column_info($col);
1284
1285       if ( $col_info->{auto_nextval} ) {
1286         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1287           'nextval',
1288           $col_info->{sequence} ||
1289             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1290         );
1291       }
1292     }
1293   }
1294
1295   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1296
1297   return $updated_cols;
1298 }
1299
1300 ## Still not quite perfect, and EXPERIMENTAL
1301 ## Currently it is assumed that all values passed will be "normal", i.e. not
1302 ## scalar refs, or at least, all the same type as the first set, the statement is
1303 ## only prepped once.
1304 sub insert_bulk {
1305   my ($self, $source, $cols, $data) = @_;
1306   my %colvalues;
1307   my $table = $source->from;
1308   @colvalues{@$cols} = (0..$#$cols);
1309   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1310
1311   $self->_determine_driver;
1312
1313   $self->_query_start( $sql, @bind );
1314   my $sth = $self->sth($sql);
1315
1316 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1317
1318   ## This must be an arrayref, else nothing works!
1319   my $tuple_status = [];
1320
1321   ## Get the bind_attributes, if any exist
1322   my $bind_attributes = $self->source_bind_attributes($source);
1323
1324   ## Bind the values and execute
1325   my $placeholder_index = 1;
1326
1327   foreach my $bound (@bind) {
1328
1329     my $attributes = {};
1330     my ($column_name, $data_index) = @$bound;
1331
1332     if( $bind_attributes ) {
1333       $attributes = $bind_attributes->{$column_name}
1334       if defined $bind_attributes->{$column_name};
1335     }
1336
1337     my @data = map { $_->[$data_index] } @$data;
1338
1339     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1340     $placeholder_index++;
1341   }
1342   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1343   if (my $err = $@) {
1344     my $i = 0;
1345     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1346
1347     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1348       if ($i > $#$tuple_status);
1349
1350     require Data::Dumper;
1351     local $Data::Dumper::Terse = 1;
1352     local $Data::Dumper::Indent = 1;
1353     local $Data::Dumper::Useqq = 1;
1354     local $Data::Dumper::Quotekeys = 0;
1355
1356     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1357       $tuple_status->[$i][1],
1358       Data::Dumper::Dumper(
1359         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1360       ),
1361     );
1362   }
1363   $self->throw_exception($sth->errstr) if !$rv;
1364
1365   $self->_query_end( $sql, @bind );
1366   return (wantarray ? ($rv, $sth, @bind) : $rv);
1367 }
1368
1369 sub update {
1370   my $self = shift @_;
1371   my $source = shift @_;
1372   $self->_determine_driver;
1373   my $bind_attributes = $self->source_bind_attributes($source);
1374
1375   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1376 }
1377
1378
1379 sub delete {
1380   my $self = shift @_;
1381   my $source = shift @_;
1382   $self->_determine_driver;
1383   my $bind_attrs = $self->source_bind_attributes($source);
1384
1385   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1386 }
1387
1388 # We were sent here because the $rs contains a complex search
1389 # which will require a subquery to select the correct rows
1390 # (i.e. joined or limited resultsets)
1391 #
1392 # Genarating a single PK column subquery is trivial and supported
1393 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1394 # Look at _multipk_update_delete()
1395 sub _subq_update_delete {
1396   my $self = shift;
1397   my ($rs, $op, $values) = @_;
1398
1399   my $rsrc = $rs->result_source;
1400
1401   # we already check this, but double check naively just in case. Should be removed soon
1402   my $sel = $rs->_resolved_attrs->{select};
1403   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1404   my @pcols = $rsrc->primary_columns;
1405   if (@$sel != @pcols) {
1406     $self->throw_exception (
1407       'Subquery update/delete can not be called on resultsets selecting a'
1408      .' number of columns different than the number of primary keys'
1409     );
1410   }
1411
1412   if (@pcols == 1) {
1413     return $self->$op (
1414       $rsrc,
1415       $op eq 'update' ? $values : (),
1416       { $pcols[0] => { -in => $rs->as_query } },
1417     );
1418   }
1419
1420   else {
1421     return $self->_multipk_update_delete (@_);
1422   }
1423 }
1424
1425 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1426 # resultset update/delete involving subqueries. So by default resort
1427 # to simple (and inefficient) delete_all style per-row opearations,
1428 # while allowing specific storages to override this with a faster
1429 # implementation.
1430 #
1431 sub _multipk_update_delete {
1432   return shift->_per_row_update_delete (@_);
1433 }
1434
1435 # This is the default loop used to delete/update rows for multi PK
1436 # resultsets, and used by mysql exclusively (because it can't do anything
1437 # else).
1438 #
1439 # We do not use $row->$op style queries, because resultset update/delete
1440 # is not expected to cascade (this is what delete_all/update_all is for).
1441 #
1442 # There should be no race conditions as the entire operation is rolled
1443 # in a transaction.
1444 #
1445 sub _per_row_update_delete {
1446   my $self = shift;
1447   my ($rs, $op, $values) = @_;
1448
1449   my $rsrc = $rs->result_source;
1450   my @pcols = $rsrc->primary_columns;
1451
1452   my $guard = $self->txn_scope_guard;
1453
1454   # emulate the return value of $sth->execute for non-selects
1455   my $row_cnt = '0E0';
1456
1457   my $subrs_cur = $rs->cursor;
1458   while (my @pks = $subrs_cur->next) {
1459
1460     my $cond;
1461     for my $i (0.. $#pcols) {
1462       $cond->{$pcols[$i]} = $pks[$i];
1463     }
1464
1465     $self->$op (
1466       $rsrc,
1467       $op eq 'update' ? $values : (),
1468       $cond,
1469     );
1470
1471     $row_cnt++;
1472   }
1473
1474   $guard->commit;
1475
1476   return $row_cnt;
1477 }
1478
1479 sub _select {
1480   my $self = shift;
1481
1482   # localization is neccessary as
1483   # 1) there is no infrastructure to pass this around before SQLA2
1484   # 2) _select_args sets it and _prep_for_execute consumes it
1485   my $sql_maker = $self->sql_maker;
1486   local $sql_maker->{_dbic_rs_attrs};
1487
1488   return $self->_execute($self->_select_args(@_));
1489 }
1490
1491 sub _select_args_to_query {
1492   my $self = shift;
1493
1494   # localization is neccessary as
1495   # 1) there is no infrastructure to pass this around before SQLA2
1496   # 2) _select_args sets it and _prep_for_execute consumes it
1497   my $sql_maker = $self->sql_maker;
1498   local $sql_maker->{_dbic_rs_attrs};
1499
1500   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1501   #  = $self->_select_args($ident, $select, $cond, $attrs);
1502   my ($op, $bind, $ident, $bind_attrs, @args) =
1503     $self->_select_args(@_);
1504
1505   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1506   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1507   $prepared_bind ||= [];
1508
1509   return wantarray
1510     ? ($sql, $prepared_bind, $bind_attrs)
1511     : \[ "($sql)", @$prepared_bind ]
1512   ;
1513 }
1514
1515 sub _select_args {
1516   my ($self, $ident, $select, $where, $attrs) = @_;
1517
1518   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1519
1520   my $sql_maker = $self->sql_maker;
1521   $sql_maker->{_dbic_rs_attrs} = {
1522     %$attrs,
1523     select => $select,
1524     from => $ident,
1525     where => $where,
1526     $rs_alias
1527       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1528       : ()
1529     ,
1530   };
1531
1532   # calculate bind_attrs before possible $ident mangling
1533   my $bind_attrs = {};
1534   for my $alias (keys %$alias2source) {
1535     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1536     for my $col (keys %$bindtypes) {
1537
1538       my $fqcn = join ('.', $alias, $col);
1539       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1540
1541       # Unqialified column names are nice, but at the same time can be
1542       # rather ambiguous. What we do here is basically go along with
1543       # the loop, adding an unqualified column slot to $bind_attrs,
1544       # alongside the fully qualified name. As soon as we encounter
1545       # another column by that name (which would imply another table)
1546       # we unset the unqualified slot and never add any info to it
1547       # to avoid erroneous type binding. If this happens the users
1548       # only choice will be to fully qualify his column name
1549
1550       if (exists $bind_attrs->{$col}) {
1551         $bind_attrs->{$col} = {};
1552       }
1553       else {
1554         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1555       }
1556     }
1557   }
1558
1559   # adjust limits
1560   if (
1561     $attrs->{software_limit}
1562       ||
1563     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1564   ) {
1565     $attrs->{software_limit} = 1;
1566   }
1567   else {
1568     $self->throw_exception("rows attribute must be positive if present")
1569       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1570
1571     # MySQL actually recommends this approach.  I cringe.
1572     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1573   }
1574
1575   my @limit;
1576
1577   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1578   # otherwise delegate the limiting to the storage, unless software limit was requested
1579   if (
1580     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1581        ||
1582     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1583       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1584   ) {
1585     ($ident, $select, $where, $attrs)
1586       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1587   }
1588   elsif (! $attrs->{software_limit} ) {
1589     push @limit, $attrs->{rows}, $attrs->{offset};
1590   }
1591
1592 ###
1593   # This would be the point to deflate anything found in $where
1594   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1595   # expect a row object. And all we have is a resultsource (it is trivial
1596   # to extract deflator coderefs via $alias2source above).
1597   #
1598   # I don't see a way forward other than changing the way deflators are
1599   # invoked, and that's just bad...
1600 ###
1601
1602   my $order = { map
1603     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1604     (qw/order_by group_by having/ )
1605   };
1606
1607   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1608 }
1609
1610 #
1611 # This is the code producing joined subqueries like:
1612 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1613 #
1614 sub _adjust_select_args_for_complex_prefetch {
1615   my ($self, $from, $select, $where, $attrs) = @_;
1616
1617   $self->throw_exception ('Nothing to prefetch... how did we get here?!')
1618     if not @{$attrs->{_prefetch_select}};
1619
1620   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1621     if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
1622
1623
1624   # generate inner/outer attribute lists, remove stuff that doesn't apply
1625   my $outer_attrs = { %$attrs };
1626   delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
1627
1628   my $inner_attrs = { %$attrs };
1629   delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
1630
1631
1632   # bring over all non-collapse-induced order_by into the inner query (if any)
1633   # the outer one will have to keep them all
1634   delete $inner_attrs->{order_by};
1635   if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
1636     $inner_attrs->{order_by} = [
1637       @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1638     ];
1639   }
1640
1641
1642   # generate the inner/outer select lists
1643   # for inside we consider only stuff *not* brought in by the prefetch
1644   # on the outside we substitute any function for its alias
1645   my $outer_select = [ @$select ];
1646   my $inner_select = [];
1647   for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
1648     my $sel = $outer_select->[$i];
1649
1650     if (ref $sel eq 'HASH' ) {
1651       $sel->{-as} ||= $attrs->{as}[$i];
1652       $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
1653     }
1654
1655     push @$inner_select, $sel;
1656   }
1657
1658   # normalize a copy of $from, so it will be easier to work with further
1659   # down (i.e. promote the initial hashref to an AoH)
1660   $from = [ @$from ];
1661   $from->[0] = [ $from->[0] ];
1662   my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1663
1664
1665   # decide which parts of the join will remain in either part of
1666   # the outer/inner query
1667
1668   # First we compose a list of which aliases are used in restrictions
1669   # (i.e. conditions/order/grouping/etc). Since we do not have
1670   # introspectable SQLA, we fall back to ugly scanning of raw SQL for
1671   # WHERE, and for pieces of ORDER BY in order to determine which aliases
1672   # need to appear in the resulting sql.
1673   # It may not be very efficient, but it's a reasonable stop-gap
1674   # Also unqualified column names will not be considered, but more often
1675   # than not this is actually ok
1676   #
1677   # In the same loop we enumerate part of the selection aliases, as
1678   # it requires the same sqla hack for the time being
1679   my ($restrict_aliases, $select_aliases, $prefetch_aliases);
1680   {
1681     # produce stuff unquoted, so it can be scanned
1682     my $sql_maker = $self->sql_maker;
1683     local $sql_maker->{quote_char};
1684     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1685     $sep = "\Q$sep\E";
1686
1687     my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
1688     my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
1689     my $where_sql = $sql_maker->where ($where);
1690     my $group_by_sql = $sql_maker->_order_by({
1691       map { $_ => $inner_attrs->{$_} } qw/group_by having/
1692     });
1693     my @non_prefetch_order_by_chunks = (map
1694       { ref $_ ? $_->[0] : $_ }
1695       $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
1696     );
1697
1698
1699     for my $alias (keys %original_join_info) {
1700       my $seen_re = qr/\b $alias $sep/x;
1701
1702       for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
1703         if ($piece =~ $seen_re) {
1704           $restrict_aliases->{$alias} = 1;
1705         }
1706       }
1707
1708       if ($non_prefetch_select_sql =~ $seen_re) {
1709           $select_aliases->{$alias} = 1;
1710       }
1711
1712       if ($prefetch_select_sql =~ $seen_re) {
1713           $prefetch_aliases->{$alias} = 1;
1714       }
1715
1716     }
1717   }
1718
1719   # Add any non-left joins to the restriction list (such joins are indeed restrictions)
1720   for my $j (values %original_join_info) {
1721     my $alias = $j->{-alias} or next;
1722     $restrict_aliases->{$alias} = 1 if (
1723       (not $j->{-join_type})
1724         or
1725       ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
1726     );
1727   }
1728
1729   # mark all join parents as mentioned
1730   # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
1731   for my $collection ($restrict_aliases, $select_aliases) {
1732     for my $alias (keys %$collection) {
1733       $collection->{$_} = 1
1734         for (@{ $original_join_info{$alias}{-join_path} || [] });
1735     }
1736   }
1737
1738   # construct the inner $from for the subquery
1739   my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
1740   my @inner_from;
1741   for my $j (@$from) {
1742     push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
1743   }
1744
1745   # if a multi-type join was needed in the subquery ("multi" is indicated by
1746   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1747   unless ($inner_attrs->{group_by}) {
1748     for my $alias (keys %inner_joins) {
1749
1750       # the dot comes from some weirdness in collapse
1751       # remove after the rewrite
1752       if ($attrs->{collapse}{".$alias"}) {
1753         $inner_attrs->{group_by} ||= $inner_select;
1754         last;
1755       }
1756     }
1757   }
1758
1759   # demote the inner_from head
1760   $inner_from[0] = $inner_from[0][0];
1761
1762   # generate the subquery
1763   my $subq = $self->_select_args_to_query (
1764     \@inner_from,
1765     $inner_select,
1766     $where,
1767     $inner_attrs,
1768   );
1769
1770   my $subq_joinspec = {
1771     -alias => $attrs->{alias},
1772     -source_handle => $inner_from[0]{-source_handle},
1773     $attrs->{alias} => $subq,
1774   };
1775
1776   # Generate the outer from - this is relatively easy (really just replace
1777   # the join slot with the subquery), with a major caveat - we can not
1778   # join anything that is non-selecting (not part of the prefetch), but at
1779   # the same time is a multi-type relationship, as it will explode the result.
1780   #
1781   # There are two possibilities here
1782   # - either the join is non-restricting, in which case we simply throw it away
1783   # - it is part of the restrictions, in which case we need to collapse the outer
1784   #   result by tackling yet another group_by to the outside of the query
1785
1786   # so first generate the outer_from, up to the substitution point
1787   my @outer_from;
1788   while (my $j = shift @$from) {
1789     if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
1790       push @outer_from, [
1791         $subq_joinspec,
1792         @{$j}[1 .. $#$j],
1793       ];
1794       last; # we'll take care of what's left in $from below
1795     }
1796     else {
1797       push @outer_from, $j;
1798     }
1799   }
1800
1801   # see what's left - throw away if not selecting/restricting
1802   # also throw in a group_by if restricting to guard against
1803   # cross-join explosions
1804   #
1805   while (my $j = shift @$from) {
1806     my $alias = $j->[0]{-alias};
1807
1808     if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
1809       push @outer_from, $j;
1810     }
1811     elsif ($restrict_aliases->{$alias}) {
1812       push @outer_from, $j;
1813
1814       # FIXME - this should be obviated by SQLA2, as I'll be able to 
1815       # have restrict_inner and restrict_outer... or something to that
1816       # effect... I think...
1817
1818       # FIXME2 - I can't find a clean way to determine if a particular join
1819       # is a multi - instead I am just treating everything as a potential
1820       # explosive join (ribasushi)
1821       #
1822       # if (my $handle = $j->[0]{-source_handle}) {
1823       #   my $rsrc = $handle->resolve;
1824       #   ... need to bail out of the following if this is not a multi,
1825       #       as it will be much easier on the db ...
1826
1827           $outer_attrs->{group_by} ||= $outer_select;
1828       # }
1829     }
1830   }
1831
1832   # demote the outer_from head
1833   $outer_from[0] = $outer_from[0][0];
1834
1835   # This is totally horrific - the $where ends up in both the inner and outer query
1836   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
1837   # then if where conditions apply to the *right* side of the prefetch, you may have
1838   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
1839   # the outer select to exclude joins you didin't want in the first place
1840   #
1841   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1842   return (\@outer_from, $outer_select, $where, $outer_attrs);
1843 }
1844
1845 sub _resolve_ident_sources {
1846   my ($self, $ident) = @_;
1847
1848   my $alias2source = {};
1849   my $rs_alias;
1850
1851   # the reason this is so contrived is that $ident may be a {from}
1852   # structure, specifying multiple tables to join
1853   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1854     # this is compat mode for insert/update/delete which do not deal with aliases
1855     $alias2source->{me} = $ident;
1856     $rs_alias = 'me';
1857   }
1858   elsif (ref $ident eq 'ARRAY') {
1859
1860     for (@$ident) {
1861       my $tabinfo;
1862       if (ref $_ eq 'HASH') {
1863         $tabinfo = $_;
1864         $rs_alias = $tabinfo->{-alias};
1865       }
1866       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1867         $tabinfo = $_->[0];
1868       }
1869
1870       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1871         if ($tabinfo->{-source_handle});
1872     }
1873   }
1874
1875   return ($alias2source, $rs_alias);
1876 }
1877
1878 # Takes $ident, \@column_names
1879 #
1880 # returns { $column_name => \%column_info, ... }
1881 # also note: this adds -result_source => $rsrc to the column info
1882 #
1883 # usage:
1884 #   my $col_sources = $self->_resolve_column_info($ident, @column_names);
1885 sub _resolve_column_info {
1886   my ($self, $ident, $colnames) = @_;
1887   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
1888
1889   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1890   $sep = "\Q$sep\E";
1891
1892   my (%return, %seen_cols);
1893
1894   # compile a global list of column names, to be able to properly
1895   # disambiguate unqualified column names (if at all possible)
1896   for my $alias (keys %$alias2src) {
1897     my $rsrc = $alias2src->{$alias};
1898     for my $colname ($rsrc->columns) {
1899       push @{$seen_cols{$colname}}, $alias;
1900     }
1901   }
1902
1903   COLUMN:
1904   foreach my $col (@$colnames) {
1905     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
1906
1907     unless ($alias) {
1908       # see if the column was seen exactly once (so we know which rsrc it came from)
1909       if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
1910         $alias = $seen_cols{$colname}[0];
1911       }
1912       else {
1913         next COLUMN;
1914       }
1915     }
1916
1917     my $rsrc = $alias2src->{$alias};
1918     $return{$col} = $rsrc && {
1919       %{$rsrc->column_info($colname)},
1920       -result_source => $rsrc,
1921       -source_alias => $alias,
1922     };
1923   }
1924
1925   return \%return;
1926 }
1927
1928 # Returns a counting SELECT for a simple count
1929 # query. Abstracted so that a storage could override
1930 # this to { count => 'firstcol' } or whatever makes
1931 # sense as a performance optimization
1932 sub _count_select {
1933   #my ($self, $source, $rs_attrs) = @_;
1934   return { count => '*' };
1935 }
1936
1937 # Returns a SELECT which will end up in the subselect
1938 # There may or may not be a group_by, as the subquery
1939 # might have been called to accomodate a limit
1940 #
1941 # Most databases would be happy with whatever ends up
1942 # here, but some choke in various ways.
1943 #
1944 sub _subq_count_select {
1945   my ($self, $source, $rs_attrs) = @_;
1946   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1947
1948   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1949   return @pcols ? \@pcols : [ 1 ];
1950 }
1951
1952
1953 sub source_bind_attributes {
1954   my ($self, $source) = @_;
1955
1956   my $bind_attributes;
1957   foreach my $column ($source->columns) {
1958
1959     my $data_type = $source->column_info($column)->{data_type} || '';
1960     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1961      if $data_type;
1962   }
1963
1964   return $bind_attributes;
1965 }
1966
1967 =head2 select
1968
1969 =over 4
1970
1971 =item Arguments: $ident, $select, $condition, $attrs
1972
1973 =back
1974
1975 Handle a SQL select statement.
1976
1977 =cut
1978
1979 sub select {
1980   my $self = shift;
1981   my ($ident, $select, $condition, $attrs) = @_;
1982   return $self->cursor_class->new($self, \@_, $attrs);
1983 }
1984
1985 sub select_single {
1986   my $self = shift;
1987   my ($rv, $sth, @bind) = $self->_select(@_);
1988   my @row = $sth->fetchrow_array;
1989   my @nextrow = $sth->fetchrow_array if @row;
1990   if(@row && @nextrow) {
1991     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1992   }
1993   # Need to call finish() to work round broken DBDs
1994   $sth->finish();
1995   return @row;
1996 }
1997
1998 =head2 sth
1999
2000 =over 4
2001
2002 =item Arguments: $sql
2003
2004 =back
2005
2006 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2007
2008 =cut
2009
2010 sub _dbh_sth {
2011   my ($self, $dbh, $sql) = @_;
2012
2013   # 3 is the if_active parameter which avoids active sth re-use
2014   my $sth = $self->disable_sth_caching
2015     ? $dbh->prepare($sql)
2016     : $dbh->prepare_cached($sql, {}, 3);
2017
2018   # XXX You would think RaiseError would make this impossible,
2019   #  but apparently that's not true :(
2020   $self->throw_exception($dbh->errstr) if !$sth;
2021
2022   $sth;
2023 }
2024
2025 sub sth {
2026   my ($self, $sql) = @_;
2027   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2028 }
2029
2030 sub _dbh_columns_info_for {
2031   my ($self, $dbh, $table) = @_;
2032
2033   if ($dbh->can('column_info')) {
2034     my %result;
2035     eval {
2036       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2037       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2038       $sth->execute();
2039       while ( my $info = $sth->fetchrow_hashref() ){
2040         my %column_info;
2041         $column_info{data_type}   = $info->{TYPE_NAME};
2042         $column_info{size}      = $info->{COLUMN_SIZE};
2043         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2044         $column_info{default_value} = $info->{COLUMN_DEF};
2045         my $col_name = $info->{COLUMN_NAME};
2046         $col_name =~ s/^\"(.*)\"$/$1/;
2047
2048         $result{$col_name} = \%column_info;
2049       }
2050     };
2051     return \%result if !$@ && scalar keys %result;
2052   }
2053
2054   my %result;
2055   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2056   $sth->execute;
2057   my @columns = @{$sth->{NAME_lc}};
2058   for my $i ( 0 .. $#columns ){
2059     my %column_info;
2060     $column_info{data_type} = $sth->{TYPE}->[$i];
2061     $column_info{size} = $sth->{PRECISION}->[$i];
2062     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2063
2064     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2065       $column_info{data_type} = $1;
2066       $column_info{size}    = $2;
2067     }
2068
2069     $result{$columns[$i]} = \%column_info;
2070   }
2071   $sth->finish;
2072
2073   foreach my $col (keys %result) {
2074     my $colinfo = $result{$col};
2075     my $type_num = $colinfo->{data_type};
2076     my $type_name;
2077     if(defined $type_num && $dbh->can('type_info')) {
2078       my $type_info = $dbh->type_info($type_num);
2079       $type_name = $type_info->{TYPE_NAME} if $type_info;
2080       $colinfo->{data_type} = $type_name if $type_name;
2081     }
2082   }
2083
2084   return \%result;
2085 }
2086
2087 sub columns_info_for {
2088   my ($self, $table) = @_;
2089   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2090 }
2091
2092 =head2 last_insert_id
2093
2094 Return the row id of the last insert.
2095
2096 =cut
2097
2098 sub _dbh_last_insert_id {
2099     # All Storage's need to register their own _dbh_last_insert_id
2100     # the old SQLite-based method was highly inappropriate
2101
2102     my $self = shift;
2103     my $class = ref $self;
2104     $self->throw_exception (<<EOE);
2105
2106 No _dbh_last_insert_id() method found in $class.
2107 Since the method of obtaining the autoincrement id of the last insert
2108 operation varies greatly between different databases, this method must be
2109 individually implemented for every storage class.
2110 EOE
2111 }
2112
2113 sub last_insert_id {
2114   my $self = shift;
2115   $self->_dbh_last_insert_id ($self->_dbh, @_);
2116 }
2117
2118 =head2 _native_data_type
2119
2120 =over 4
2121
2122 =item Arguments: $type_name
2123
2124 =back
2125
2126 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2127 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2128 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
2129
2130 The default implementation returns C<undef>, implement in your Storage driver if
2131 you need this functionality.
2132
2133 Should map types from other databases to the native RDBMS type, for example
2134 C<VARCHAR2> to C<VARCHAR>.
2135
2136 Types with modifiers should map to the underlying data type. For example,
2137 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2138
2139 Composite types should map to the container type, for example
2140 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2141
2142 =cut
2143
2144 sub _native_data_type {
2145   #my ($self, $data_type) = @_;
2146   return undef
2147 }
2148
2149 =head2 sqlt_type
2150
2151 Returns the database driver name.
2152
2153 =cut
2154
2155 sub sqlt_type {
2156   my ($self) = @_;
2157
2158   if (not $self->_driver_determined) {
2159     $self->_determine_driver;
2160     goto $self->can ('sqlt_type');
2161   }
2162
2163   $self->_get_dbh->{Driver}->{Name};
2164 }
2165
2166 =head2 bind_attribute_by_data_type
2167
2168 Given a datatype from column info, returns a database specific bind
2169 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2170 let the database planner just handle it.
2171
2172 Generally only needed for special case column types, like bytea in postgres.
2173
2174 =cut
2175
2176 sub bind_attribute_by_data_type {
2177     return;
2178 }
2179
2180 =head2 is_datatype_numeric
2181
2182 Given a datatype from column_info, returns a boolean value indicating if
2183 the current RDBMS considers it a numeric value. This controls how
2184 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2185 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2186 be performed instead of the usual C<eq>.
2187
2188 =cut
2189
2190 sub is_datatype_numeric {
2191   my ($self, $dt) = @_;
2192
2193   return 0 unless $dt;
2194
2195   return $dt =~ /^ (?:
2196     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2197   ) $/ix;
2198 }
2199
2200
2201 =head2 create_ddl_dir (EXPERIMENTAL)
2202
2203 =over 4
2204
2205 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2206
2207 =back
2208
2209 Creates a SQL file based on the Schema, for each of the specified
2210 database engines in C<\@databases> in the given directory.
2211 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2212
2213 Given a previous version number, this will also create a file containing
2214 the ALTER TABLE statements to transform the previous schema into the
2215 current one. Note that these statements may contain C<DROP TABLE> or
2216 C<DROP COLUMN> statements that can potentially destroy data.
2217
2218 The file names are created using the C<ddl_filename> method below, please
2219 override this method in your schema if you would like a different file
2220 name format. For the ALTER file, the same format is used, replacing
2221 $version in the name with "$preversion-$version".
2222
2223 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2224 The most common value for this would be C<< { add_drop_table => 1 } >>
2225 to have the SQL produced include a C<DROP TABLE> statement for each table
2226 created. For quoting purposes supply C<quote_table_names> and
2227 C<quote_field_names>.
2228
2229 If no arguments are passed, then the following default values are assumed:
2230
2231 =over 4
2232
2233 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2234
2235 =item version    - $schema->schema_version
2236
2237 =item directory  - './'
2238
2239 =item preversion - <none>
2240
2241 =back
2242
2243 By default, C<\%sqlt_args> will have
2244
2245  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2246
2247 merged with the hash passed in. To disable any of those features, pass in a
2248 hashref like the following
2249
2250  { ignore_constraint_names => 0, # ... other options }
2251
2252
2253 Note that this feature is currently EXPERIMENTAL and may not work correctly
2254 across all databases, or fully handle complex relationships.
2255
2256 WARNING: Please check all SQL files created, before applying them.
2257
2258 =cut
2259
2260 sub create_ddl_dir {
2261   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2262
2263   if(!$dir || !-d $dir) {
2264     carp "No directory given, using ./\n";
2265     $dir = "./";
2266   }
2267   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2268   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2269
2270   my $schema_version = $schema->schema_version || '1.x';
2271   $version ||= $schema_version;
2272
2273   $sqltargs = {
2274     add_drop_table => 1,
2275     ignore_constraint_names => 1,
2276     ignore_index_names => 1,
2277     %{$sqltargs || {}}
2278   };
2279
2280   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
2281       . $self->_check_sqlt_message . q{'})
2282           if !$self->_check_sqlt_version;
2283
2284   my $sqlt = SQL::Translator->new( $sqltargs );
2285
2286   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2287   my $sqlt_schema = $sqlt->translate({ data => $schema })
2288     or $self->throw_exception ($sqlt->error);
2289
2290   foreach my $db (@$databases) {
2291     $sqlt->reset();
2292     $sqlt->{schema} = $sqlt_schema;
2293     $sqlt->producer($db);
2294
2295     my $file;
2296     my $filename = $schema->ddl_filename($db, $version, $dir);
2297     if (-e $filename && ($version eq $schema_version )) {
2298       # if we are dumping the current version, overwrite the DDL
2299       carp "Overwriting existing DDL file - $filename";
2300       unlink($filename);
2301     }
2302
2303     my $output = $sqlt->translate;
2304     if(!$output) {
2305       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2306       next;
2307     }
2308     if(!open($file, ">$filename")) {
2309       $self->throw_exception("Can't open $filename for writing ($!)");
2310       next;
2311     }
2312     print $file $output;
2313     close($file);
2314
2315     next unless ($preversion);
2316
2317     require SQL::Translator::Diff;
2318
2319     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2320     if(!-e $prefilename) {
2321       carp("No previous schema file found ($prefilename)");
2322       next;
2323     }
2324
2325     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2326     if(-e $difffile) {
2327       carp("Overwriting existing diff file - $difffile");
2328       unlink($difffile);
2329     }
2330
2331     my $source_schema;
2332     {
2333       my $t = SQL::Translator->new($sqltargs);
2334       $t->debug( 0 );
2335       $t->trace( 0 );
2336
2337       $t->parser( $db )
2338         or $self->throw_exception ($t->error);
2339
2340       my $out = $t->translate( $prefilename )
2341         or $self->throw_exception ($t->error);
2342
2343       $source_schema = $t->schema;
2344
2345       $source_schema->name( $prefilename )
2346         unless ( $source_schema->name );
2347     }
2348
2349     # The "new" style of producers have sane normalization and can support
2350     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2351     # And we have to diff parsed SQL against parsed SQL.
2352     my $dest_schema = $sqlt_schema;
2353
2354     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2355       my $t = SQL::Translator->new($sqltargs);
2356       $t->debug( 0 );
2357       $t->trace( 0 );
2358
2359       $t->parser( $db )
2360         or $self->throw_exception ($t->error);
2361
2362       my $out = $t->translate( $filename )
2363         or $self->throw_exception ($t->error);
2364
2365       $dest_schema = $t->schema;
2366
2367       $dest_schema->name( $filename )
2368         unless $dest_schema->name;
2369     }
2370
2371     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2372                                                   $dest_schema,   $db,
2373                                                   $sqltargs
2374                                                  );
2375     if(!open $file, ">$difffile") {
2376       $self->throw_exception("Can't write to $difffile ($!)");
2377       next;
2378     }
2379     print $file $diff;
2380     close($file);
2381   }
2382 }
2383
2384 =head2 deployment_statements
2385
2386 =over 4
2387
2388 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2389
2390 =back
2391
2392 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2393
2394 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2395 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2396
2397 C<$directory> is used to return statements from files in a previously created
2398 L</create_ddl_dir> directory and is optional. The filenames are constructed
2399 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2400
2401 If no C<$directory> is specified then the statements are constructed on the
2402 fly using L<SQL::Translator> and C<$version> is ignored.
2403
2404 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2405
2406 =cut
2407
2408 sub deployment_statements {
2409   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2410   $type ||= $self->sqlt_type;
2411   $version ||= $schema->schema_version || '1.x';
2412   $dir ||= './';
2413   my $filename = $schema->ddl_filename($type, $version, $dir);
2414   if(-f $filename)
2415   {
2416       my $file;
2417       open($file, "<$filename")
2418         or $self->throw_exception("Can't open $filename ($!)");
2419       my @rows = <$file>;
2420       close($file);
2421       return join('', @rows);
2422   }
2423
2424   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
2425       . $self->_check_sqlt_message . q{'})
2426           if !$self->_check_sqlt_version;
2427
2428   # sources needs to be a parser arg, but for simplicty allow at top level
2429   # coming in
2430   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2431       if exists $sqltargs->{sources};
2432
2433   my $tr = SQL::Translator->new(
2434     producer => "SQL::Translator::Producer::${type}",
2435     %$sqltargs,
2436     parser => 'SQL::Translator::Parser::DBIx::Class',
2437     data => $schema,
2438   );
2439   return $tr->translate;
2440 }
2441
2442 sub deploy {
2443   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2444   my $deploy = sub {
2445     my $line = shift;
2446     return if($line =~ /^--/);
2447     return if(!$line);
2448     # next if($line =~ /^DROP/m);
2449     return if($line =~ /^BEGIN TRANSACTION/m);
2450     return if($line =~ /^COMMIT/m);
2451     return if $line =~ /^\s+$/; # skip whitespace only
2452     $self->_query_start($line);
2453     eval {
2454       # do a dbh_do cycle here, as we need some error checking in
2455       # place (even though we will ignore errors)
2456       $self->dbh_do (sub { $_[1]->do($line) });
2457     };
2458     if ($@) {
2459       carp qq{$@ (running "${line}")};
2460     }
2461     $self->_query_end($line);
2462   };
2463   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2464   if (@statements > 1) {
2465     foreach my $statement (@statements) {
2466       $deploy->( $statement );
2467     }
2468   }
2469   elsif (@statements == 1) {
2470     foreach my $line ( split(";\n", $statements[0])) {
2471       $deploy->( $line );
2472     }
2473   }
2474 }
2475
2476 =head2 datetime_parser
2477
2478 Returns the datetime parser class
2479
2480 =cut
2481
2482 sub datetime_parser {
2483   my $self = shift;
2484   return $self->{datetime_parser} ||= do {
2485     $self->_populate_dbh unless $self->_dbh;
2486     $self->build_datetime_parser(@_);
2487   };
2488 }
2489
2490 =head2 datetime_parser_type
2491
2492 Defines (returns) the datetime parser class - currently hardwired to
2493 L<DateTime::Format::MySQL>
2494
2495 =cut
2496
2497 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2498
2499 =head2 build_datetime_parser
2500
2501 See L</datetime_parser>
2502
2503 =cut
2504
2505 sub build_datetime_parser {
2506   my $self = shift;
2507   my $type = $self->datetime_parser_type(@_);
2508   eval "use ${type}";
2509   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2510   return $type;
2511 }
2512
2513 {
2514     my $_check_sqlt_version; # private
2515     my $_check_sqlt_message; # private
2516     sub _check_sqlt_version {
2517         return $_check_sqlt_version if defined $_check_sqlt_version;
2518         eval 'use SQL::Translator "0.09003"';
2519         $_check_sqlt_message = $@ || '';
2520         $_check_sqlt_version = !$@;
2521     }
2522
2523     sub _check_sqlt_message {
2524         _check_sqlt_version if !defined $_check_sqlt_message;
2525         $_check_sqlt_message;
2526     }
2527 }
2528
2529 =head2 is_replicating
2530
2531 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2532 replicate from a master database.  Default is undef, which is the result
2533 returned by databases that don't support replication.
2534
2535 =cut
2536
2537 sub is_replicating {
2538     return;
2539
2540 }
2541
2542 =head2 lag_behind_master
2543
2544 Returns a number that represents a certain amount of lag behind a master db
2545 when a given storage is replicating.  The number is database dependent, but
2546 starts at zero and increases with the amount of lag. Default in undef
2547
2548 =cut
2549
2550 sub lag_behind_master {
2551     return;
2552 }
2553
2554 sub DESTROY {
2555   my $self = shift;
2556   $self->_verify_pid if $self->_dbh;
2557
2558   # some databases need this to stop spewing warnings
2559   if (my $dbh = $self->_dbh) {
2560     eval { $dbh->disconnect };
2561   }
2562
2563   $self->_dbh(undef);
2564 }
2565
2566 1;
2567
2568 =head1 USAGE NOTES
2569
2570 =head2 DBIx::Class and AutoCommit
2571
2572 DBIx::Class can do some wonderful magic with handling exceptions,
2573 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2574 (the default) combined with C<txn_do> for transaction support.
2575
2576 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2577 in an assumed transaction between commits, and you're telling us you'd
2578 like to manage that manually.  A lot of the magic protections offered by
2579 this module will go away.  We can't protect you from exceptions due to database
2580 disconnects because we don't know anything about how to restart your
2581 transactions.  You're on your own for handling all sorts of exceptional
2582 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2583 be with raw DBI.
2584
2585
2586 =head1 AUTHORS
2587
2588 Matt S. Trout <mst@shadowcatsystems.co.uk>
2589
2590 Andy Grundman <andy@hybridized.org>
2591
2592 =head1 LICENSE
2593
2594 You may distribute this code under the same terms as Perl itself.
2595
2596 =cut