Caelum was right to make _get_dbh private - reverting (and some code refactoring)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16
17 __PACKAGE__->mk_group_accessors('simple' =>
18   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
19      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
26   disable_sth_caching unsafe auto_savepoint
27 /;
28 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
29
30
31 # default cursor class, overridable in connect_info attributes
32 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
33
34 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
35 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
36
37
38 =head1 NAME
39
40 DBIx::Class::Storage::DBI - DBI storage handler
41
42 =head1 SYNOPSIS
43
44   my $schema = MySchema->connect('dbi:SQLite:my.db');
45
46   $schema->storage->debug(1);
47   $schema->dbh_do("DROP TABLE authors");
48
49   $schema->resultset('Book')->search({
50      written_on => $schema->storage->datetime_parser(DateTime->now)
51   });
52
53 =head1 DESCRIPTION
54
55 This class represents the connection to an RDBMS via L<DBI>.  See
56 L<DBIx::Class::Storage> for general information.  This pod only
57 documents DBI-specific methods and behaviors.
58
59 =head1 METHODS
60
61 =cut
62
63 sub new {
64   my $new = shift->next::method(@_);
65
66   $new->transaction_depth(0);
67   $new->_sql_maker_opts({});
68   $new->{savepoints} = [];
69   $new->{_in_dbh_do} = 0;
70   $new->{_dbh_gen} = 0;
71
72   $new;
73 }
74
75 =head2 connect_info
76
77 This method is normally called by L<DBIx::Class::Schema/connection>, which
78 encapsulates its argument list in an arrayref before passing them here.
79
80 The argument list may contain:
81
82 =over
83
84 =item *
85
86 The same 4-element argument set one would normally pass to
87 L<DBI/connect>, optionally followed by
88 L<extra attributes|/DBIx::Class specific connection attributes>
89 recognized by DBIx::Class:
90
91   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
92
93 =item *
94
95 A single code reference which returns a connected
96 L<DBI database handle|DBI/connect> optionally followed by
97 L<extra attributes|/DBIx::Class specific connection attributes> recognized
98 by DBIx::Class:
99
100   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
101
102 =item *
103
104 A single hashref with all the attributes and the dsn/user/password
105 mixed together:
106
107   $connect_info_args = [{
108     dsn => $dsn,
109     user => $user,
110     password => $pass,
111     %dbi_attributes,
112     %extra_attributes,
113   }];
114
115 This is particularly useful for L<Catalyst> based applications, allowing the
116 following config (L<Config::General> style):
117
118   <Model::DB>
119     schema_class   App::DB
120     <connect_info>
121       dsn          dbi:mysql:database=test
122       user         testuser
123       password     TestPass
124       AutoCommit   1
125     </connect_info>
126   </Model::DB>
127
128 =back
129
130 Please note that the L<DBI> docs recommend that you always explicitly
131 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
132 recommends that it be set to I<1>, and that you perform transactions
133 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
134 to I<1> if you do not do explicitly set it to zero.  This is the default
135 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
136
137 =head3 DBIx::Class specific connection attributes
138
139 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
140 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
141 the following connection options. These options can be mixed in with your other
142 L<DBI> connection attributes, or placed in a seperate hashref
143 (C<\%extra_attributes>) as shown above.
144
145 Every time C<connect_info> is invoked, any previous settings for
146 these options will be cleared before setting the new ones, regardless of
147 whether any options are specified in the new C<connect_info>.
148
149
150 =over
151
152 =item on_connect_do
153
154 Specifies things to do immediately after connecting or re-connecting to
155 the database.  Its value may contain:
156
157 =over
158
159 =item a scalar
160
161 This contains one SQL statement to execute.
162
163 =item an array reference
164
165 This contains SQL statements to execute in order.  Each element contains
166 a string or a code reference that returns a string.
167
168 =item a code reference
169
170 This contains some code to execute.  Unlike code references within an
171 array reference, its return value is ignored.
172
173 =back
174
175 =item on_disconnect_do
176
177 Takes arguments in the same form as L</on_connect_do> and executes them
178 immediately before disconnecting from the database.
179
180 Note, this only runs if you explicitly call L</disconnect> on the
181 storage object.
182
183 =item on_connect_call
184
185 A more generalized form of L</on_connect_do> that calls the specified
186 C<connect_call_METHOD> methods in your storage driver.
187
188   on_connect_do => 'select 1'
189
190 is equivalent to:
191
192   on_connect_call => [ [ do_sql => 'select 1' ] ]
193
194 Its values may contain:
195
196 =over
197
198 =item a scalar
199
200 Will call the C<connect_call_METHOD> method.
201
202 =item a code reference
203
204 Will execute C<< $code->($storage) >>
205
206 =item an array reference
207
208 Each value can be a method name or code reference.
209
210 =item an array of arrays
211
212 For each array, the first item is taken to be the C<connect_call_> method name
213 or code reference, and the rest are parameters to it.
214
215 =back
216
217 Some predefined storage methods you may use:
218
219 =over
220
221 =item do_sql
222
223 Executes a SQL string or a code reference that returns a SQL string. This is
224 what L</on_connect_do> and L</on_disconnect_do> use.
225
226 It can take:
227
228 =over
229
230 =item a scalar
231
232 Will execute the scalar as SQL.
233
234 =item an arrayref
235
236 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
237 attributes hashref and bind values.
238
239 =item a code reference
240
241 Will execute C<< $code->($storage) >> and execute the return array refs as
242 above.
243
244 =back
245
246 =item datetime_setup
247
248 Execute any statements necessary to initialize the database session to return
249 and accept datetime/timestamp values used with
250 L<DBIx::Class::InflateColumn::DateTime>.
251
252 Only necessary for some databases, see your specific storage driver for
253 implementation details.
254
255 =back
256
257 =item on_disconnect_call
258
259 Takes arguments in the same form as L</on_connect_call> and executes them
260 immediately before disconnecting from the database.
261
262 Calls the C<disconnect_call_METHOD> methods as opposed to the
263 C<connect_call_METHOD> methods called by L</on_connect_call>.
264
265 Note, this only runs if you explicitly call L</disconnect> on the
266 storage object.
267
268 =item disable_sth_caching
269
270 If set to a true value, this option will disable the caching of
271 statement handles via L<DBI/prepare_cached>.
272
273 =item limit_dialect
274
275 Sets the limit dialect. This is useful for JDBC-bridge among others
276 where the remote SQL-dialect cannot be determined by the name of the
277 driver alone. See also L<SQL::Abstract::Limit>.
278
279 =item quote_char
280
281 Specifies what characters to use to quote table and column names. If
282 you use this you will want to specify L</name_sep> as well.
283
284 C<quote_char> expects either a single character, in which case is it
285 is placed on either side of the table/column name, or an arrayref of length
286 2 in which case the table/column name is placed between the elements.
287
288 For example under MySQL you should use C<< quote_char => '`' >>, and for
289 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
290
291 =item name_sep
292
293 This only needs to be used in conjunction with C<quote_char>, and is used to
294 specify the charecter that seperates elements (schemas, tables, columns) from
295 each other. In most cases this is simply a C<.>.
296
297 The consequences of not supplying this value is that L<SQL::Abstract>
298 will assume DBIx::Class' uses of aliases to be complete column
299 names. The output will look like I<"me.name"> when it should actually
300 be I<"me"."name">.
301
302 =item unsafe
303
304 This Storage driver normally installs its own C<HandleError>, sets
305 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
306 all database handles, including those supplied by a coderef.  It does this
307 so that it can have consistent and useful error behavior.
308
309 If you set this option to a true value, Storage will not do its usual
310 modifications to the database handle's attributes, and instead relies on
311 the settings in your connect_info DBI options (or the values you set in
312 your connection coderef, in the case that you are connecting via coderef).
313
314 Note that your custom settings can cause Storage to malfunction,
315 especially if you set a C<HandleError> handler that suppresses exceptions
316 and/or disable C<RaiseError>.
317
318 =item auto_savepoint
319
320 If this option is true, L<DBIx::Class> will use savepoints when nesting
321 transactions, making it possible to recover from failure in the inner
322 transaction without having to abort all outer transactions.
323
324 =item cursor_class
325
326 Use this argument to supply a cursor class other than the default
327 L<DBIx::Class::Storage::DBI::Cursor>.
328
329 =back
330
331 Some real-life examples of arguments to L</connect_info> and
332 L<DBIx::Class::Schema/connect>
333
334   # Simple SQLite connection
335   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
336
337   # Connect via subref
338   ->connect_info([ sub { DBI->connect(...) } ]);
339
340   # A bit more complicated
341   ->connect_info(
342     [
343       'dbi:Pg:dbname=foo',
344       'postgres',
345       'my_pg_password',
346       { AutoCommit => 1 },
347       { quote_char => q{"}, name_sep => q{.} },
348     ]
349   );
350
351   # Equivalent to the previous example
352   ->connect_info(
353     [
354       'dbi:Pg:dbname=foo',
355       'postgres',
356       'my_pg_password',
357       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
358     ]
359   );
360
361   # Same, but with hashref as argument
362   # See parse_connect_info for explanation
363   ->connect_info(
364     [{
365       dsn         => 'dbi:Pg:dbname=foo',
366       user        => 'postgres',
367       password    => 'my_pg_password',
368       AutoCommit  => 1,
369       quote_char  => q{"},
370       name_sep    => q{.},
371     }]
372   );
373
374   # Subref + DBIx::Class-specific connection options
375   ->connect_info(
376     [
377       sub { DBI->connect(...) },
378       {
379           quote_char => q{`},
380           name_sep => q{@},
381           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
382           disable_sth_caching => 1,
383       },
384     ]
385   );
386
387
388
389 =cut
390
391 sub connect_info {
392   my ($self, $info_arg) = @_;
393
394   return $self->_connect_info if !$info_arg;
395
396   my @args = @$info_arg;  # take a shallow copy for further mutilation
397   $self->_connect_info([@args]); # copy for _connect_info
398
399
400   # combine/pre-parse arguments depending on invocation style
401
402   my %attrs;
403   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
404     %attrs = %{ $args[1] || {} };
405     @args = $args[0];
406   }
407   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
408     %attrs = %{$args[0]};
409     @args = ();
410     for (qw/password user dsn/) {
411       unshift @args, delete $attrs{$_};
412     }
413   }
414   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
415     %attrs = (
416       % { $args[3] || {} },
417       % { $args[4] || {} },
418     );
419     @args = @args[0,1,2];
420   }
421
422   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
423   #  the new set of options
424   $self->_sql_maker(undef);
425   $self->_sql_maker_opts({});
426
427   if(keys %attrs) {
428     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
429       if(my $value = delete $attrs{$storage_opt}) {
430         $self->$storage_opt($value);
431       }
432     }
433     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
434       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
435         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
436       }
437     }
438   }
439
440   if (ref $args[0] eq 'CODE') {
441     # _connect() never looks past $args[0] in this case
442     %attrs = ()
443   } else {
444     %attrs = (
445       %{ $self->_default_dbi_connect_attributes || {} },
446       %attrs,
447     );
448   }
449
450   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
451   $self->_connect_info;
452 }
453
454 sub _default_dbi_connect_attributes {
455   return {
456     AutoCommit => 1,
457     RaiseError => 1,
458     PrintError => 0,
459   };
460 }
461
462 =head2 on_connect_do
463
464 This method is deprecated in favour of setting via L</connect_info>.
465
466 =cut
467
468 =head2 on_disconnect_do
469
470 This method is deprecated in favour of setting via L</connect_info>.
471
472 =cut
473
474 sub _parse_connect_do {
475   my ($self, $type) = @_;
476
477   my $val = $self->$type;
478   return () if not defined $val;
479
480   my @res;
481
482   if (not ref($val)) {
483     push @res, [ 'do_sql', $val ];
484   } elsif (ref($val) eq 'CODE') {
485     push @res, $val;
486   } elsif (ref($val) eq 'ARRAY') {
487     push @res, map { [ 'do_sql', $_ ] } @$val;
488   } else {
489     $self->throw_exception("Invalid type for $type: ".ref($val));
490   }
491
492   return \@res;
493 }
494
495 =head2 dbh_do
496
497 Arguments: ($subref | $method_name), @extra_coderef_args?
498
499 Execute the given $subref or $method_name using the new exception-based
500 connection management.
501
502 The first two arguments will be the storage object that C<dbh_do> was called
503 on and a database handle to use.  Any additional arguments will be passed
504 verbatim to the called subref as arguments 2 and onwards.
505
506 Using this (instead of $self->_dbh or $self->dbh) ensures correct
507 exception handling and reconnection (or failover in future subclasses).
508
509 Your subref should have no side-effects outside of the database, as
510 there is the potential for your subref to be partially double-executed
511 if the database connection was stale/dysfunctional.
512
513 Example:
514
515   my @stuff = $schema->storage->dbh_do(
516     sub {
517       my ($storage, $dbh, @cols) = @_;
518       my $cols = join(q{, }, @cols);
519       $dbh->selectrow_array("SELECT $cols FROM foo");
520     },
521     @column_list
522   );
523
524 =cut
525
526 sub dbh_do {
527   my $self = shift;
528   my $code = shift;
529
530   my $dbh = $self->_dbh;
531
532   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
533       || $self->{transaction_depth};
534
535   local $self->{_in_dbh_do} = 1;
536
537   my @result;
538   my $want_array = wantarray;
539
540   eval {
541     $self->_verify_pid if $dbh;
542     if(!$self->_dbh) {
543         $self->_populate_dbh;
544         $dbh = $self->_dbh;
545     }
546
547     if($want_array) {
548         @result = $self->$code($dbh, @_);
549     }
550     elsif(defined $want_array) {
551         $result[0] = $self->$code($dbh, @_);
552     }
553     else {
554         $self->$code($dbh, @_);
555     }
556   };
557
558   # ->connected might unset $@ - copy
559   my $exception = $@;
560   if(!$exception) { return $want_array ? @result : $result[0] }
561
562   $self->throw_exception($exception) if $self->connected;
563
564   # We were not connected - reconnect and retry, but let any
565   #  exception fall right through this time
566   $self->_populate_dbh;
567   $self->$code($self->_dbh, @_);
568 }
569
570 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
571 # It also informs dbh_do to bypass itself while under the direction of txn_do,
572 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
573 sub txn_do {
574   my $self = shift;
575   my $coderef = shift;
576
577   ref $coderef eq 'CODE' or $self->throw_exception
578     ('$coderef must be a CODE reference');
579
580   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
581
582   local $self->{_in_dbh_do} = 1;
583
584   my @result;
585   my $want_array = wantarray;
586
587   my $tried = 0;
588   while(1) {
589     eval {
590       $self->_verify_pid if $self->_dbh;
591       $self->_populate_dbh if !$self->_dbh;
592
593       $self->txn_begin;
594       if($want_array) {
595           @result = $coderef->(@_);
596       }
597       elsif(defined $want_array) {
598           $result[0] = $coderef->(@_);
599       }
600       else {
601           $coderef->(@_);
602       }
603       $self->txn_commit;
604     };
605
606     # ->connected might unset $@ - copy
607     my $exception = $@;
608     if(!$exception) { return $want_array ? @result : $result[0] }
609
610     if($tried++ || $self->connected) {
611       eval { $self->txn_rollback };
612       my $rollback_exception = $@;
613       if($rollback_exception) {
614         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
615         $self->throw_exception($exception)  # propagate nested rollback
616           if $rollback_exception =~ /$exception_class/;
617
618         $self->throw_exception(
619           "Transaction aborted: ${exception}. "
620           . "Rollback failed: ${rollback_exception}"
621         );
622       }
623       $self->throw_exception($exception)
624     }
625
626     # We were not connected, and was first try - reconnect and retry
627     # via the while loop
628     $self->_populate_dbh;
629   }
630 }
631
632 =head2 disconnect
633
634 Our C<disconnect> method also performs a rollback first if the
635 database is not in C<AutoCommit> mode.
636
637 =cut
638
639 sub disconnect {
640   my ($self) = @_;
641
642   if( $self->_dbh ) {
643     my @actions;
644
645     push @actions, ( $self->on_disconnect_call || () );
646     push @actions, $self->_parse_connect_do ('on_disconnect_do');
647
648     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
649
650     $self->_dbh->rollback unless $self->_dbh_autocommit;
651     $self->_dbh->disconnect;
652     $self->_dbh(undef);
653     $self->{_dbh_gen}++;
654   }
655 }
656
657 =head2 with_deferred_fk_checks
658
659 =over 4
660
661 =item Arguments: C<$coderef>
662
663 =item Return Value: The return value of $coderef
664
665 =back
666
667 Storage specific method to run the code ref with FK checks deferred or
668 in MySQL's case disabled entirely.
669
670 =cut
671
672 # Storage subclasses should override this
673 sub with_deferred_fk_checks {
674   my ($self, $sub) = @_;
675
676   $sub->();
677 }
678
679 =head2 connected
680
681 =over
682
683 =item Arguments: none
684
685 =item Return Value: 1|0
686
687 =back
688
689 Verifies that the the current database handle is active and ready to execute
690 an SQL statement (i.e. the connection did not get stale, server is still
691 answering, etc.) This method is used internally by L</dbh>.
692
693 =cut
694
695 sub connected {
696   my $self = shift;
697   return 0 unless $self->_seems_connected;
698
699   #be on the safe side
700   local $self->_dbh->{RaiseError} = 1;
701
702   return $self->_ping;
703 }
704
705 sub _seems_connected {
706   my $self = shift;
707
708   my $dbh = $self->_dbh
709     or return 0;
710
711   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
712     $self->_dbh(undef);
713     $self->{_dbh_gen}++;
714     return 0;
715   }
716   else {
717     $self->_verify_pid;
718     return 0 if !$self->_dbh;
719   }
720
721   return $dbh->FETCH('Active');
722 }
723
724 sub _ping {
725   my $self = shift;
726
727   my $dbh = $self->_dbh or return 0;
728
729   return $dbh->ping;
730 }
731
732 # handle pid changes correctly
733 #  NOTE: assumes $self->_dbh is a valid $dbh
734 sub _verify_pid {
735   my ($self) = @_;
736
737   return if defined $self->_conn_pid && $self->_conn_pid == $$;
738
739   $self->_dbh->{InactiveDestroy} = 1;
740   $self->_dbh(undef);
741   $self->{_dbh_gen}++;
742
743   return;
744 }
745
746 sub ensure_connected {
747   my ($self) = @_;
748
749   unless ($self->connected) {
750     $self->_populate_dbh;
751   }
752 }
753
754 =head2 dbh
755
756 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
757 is guaranteed to be healthy by implicitly calling L</connected>, and if
758 necessary performing a reconnection before returning. Keep in mind that this
759 is very B<expensive> on some database engines. Consider using L<dbh_do>
760 instead.
761
762 =cut
763
764 sub dbh {
765   my ($self) = @_;
766
767   if (not $self->_dbh) {
768     $self->_populate_dbh;
769   } else {
770     $self->ensure_connected;
771   }
772   return $self->_dbh;
773 }
774
775 # this is the internal "get dbh or connect (don't check)" method
776 sub _get_dbh {
777   my $self = shift;
778   $self->_populate_dbh unless $self->_dbh;
779   return $self->_dbh;
780 }
781
782 sub _sql_maker_args {
783     my ($self) = @_;
784
785     return (
786       bindtype=>'columns',
787       array_datatypes => 1,
788       limit_dialect => $self->_get_dbh,
789       %{$self->_sql_maker_opts}
790     );
791 }
792
793 sub sql_maker {
794   my ($self) = @_;
795   unless ($self->_sql_maker) {
796     my $sql_maker_class = $self->sql_maker_class;
797     $self->ensure_class_loaded ($sql_maker_class);
798     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
799   }
800   return $self->_sql_maker;
801 }
802
803 sub _rebless {}
804
805 sub _populate_dbh {
806   my ($self) = @_;
807
808   my @info = @{$self->_dbi_connect_info || []};
809   $self->_dbh(undef); # in case ->connected failed we might get sent here
810   $self->_dbh($self->_connect(@info));
811
812   $self->_conn_pid($$);
813   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
814
815   $self->_determine_driver;
816
817   # Always set the transaction depth on connect, since
818   #  there is no transaction in progress by definition
819   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
820
821   $self->_run_connection_actions unless $self->{_in_determine_driver};
822 }
823
824 sub _run_connection_actions {
825   my $self = shift;
826   my @actions;
827
828   push @actions, ( $self->on_connect_call || () );
829   push @actions, $self->_parse_connect_do ('on_connect_do');
830
831   $self->_do_connection_actions(connect_call_ => $_) for @actions;
832 }
833
834 sub _determine_driver {
835   my ($self) = @_;
836
837   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
838     my $started_unconnected = 0;
839     local $self->{_in_determine_driver} = 1;
840
841     if (ref($self) eq __PACKAGE__) {
842       my $driver;
843       if ($self->_dbh) { # we are connected
844         $driver = $self->_dbh->{Driver}{Name};
845       } else {
846         # try to use dsn to not require being connected, the driver may still
847         # force a connection in _rebless to determine version
848         ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
849         $started_unconnected = 1;
850       }
851
852       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
853       if ($self->load_optional_class($storage_class)) {
854         mro::set_mro($storage_class, 'c3');
855         bless $self, $storage_class;
856         $self->_rebless();
857       }
858     }
859
860     $self->_driver_determined(1);
861
862     $self->_run_connection_actions
863         if $started_unconnected && defined $self->_dbh;
864   }
865 }
866
867 sub _do_connection_actions {
868   my $self          = shift;
869   my $method_prefix = shift;
870   my $call          = shift;
871
872   if (not ref($call)) {
873     my $method = $method_prefix . $call;
874     $self->$method(@_);
875   } elsif (ref($call) eq 'CODE') {
876     $self->$call(@_);
877   } elsif (ref($call) eq 'ARRAY') {
878     if (ref($call->[0]) ne 'ARRAY') {
879       $self->_do_connection_actions($method_prefix, $_) for @$call;
880     } else {
881       $self->_do_connection_actions($method_prefix, @$_) for @$call;
882     }
883   } else {
884     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
885   }
886
887   return $self;
888 }
889
890 sub connect_call_do_sql {
891   my $self = shift;
892   $self->_do_query(@_);
893 }
894
895 sub disconnect_call_do_sql {
896   my $self = shift;
897   $self->_do_query(@_);
898 }
899
900 # override in db-specific backend when necessary
901 sub connect_call_datetime_setup { 1 }
902
903 sub _do_query {
904   my ($self, $action) = @_;
905
906   if (ref $action eq 'CODE') {
907     $action = $action->($self);
908     $self->_do_query($_) foreach @$action;
909   }
910   else {
911     # Most debuggers expect ($sql, @bind), so we need to exclude
912     # the attribute hash which is the second argument to $dbh->do
913     # furthermore the bind values are usually to be presented
914     # as named arrayref pairs, so wrap those here too
915     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
916     my $sql = shift @do_args;
917     my $attrs = shift @do_args;
918     my @bind = map { [ undef, $_ ] } @do_args;
919
920     $self->_query_start($sql, @bind);
921     $self->_dbh->do($sql, $attrs, @do_args);
922     $self->_query_end($sql, @bind);
923   }
924
925   return $self;
926 }
927
928 sub _connect {
929   my ($self, @info) = @_;
930
931   $self->throw_exception("You failed to provide any connection info")
932     if !@info;
933
934   my ($old_connect_via, $dbh);
935
936   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
937     $old_connect_via = $DBI::connect_via;
938     $DBI::connect_via = 'connect';
939   }
940
941   eval {
942     if(ref $info[0] eq 'CODE') {
943        $dbh = &{$info[0]}
944     }
945     else {
946        $dbh = DBI->connect(@info);
947     }
948
949     if($dbh && !$self->unsafe) {
950       my $weak_self = $self;
951       Scalar::Util::weaken($weak_self);
952       $dbh->{HandleError} = sub {
953           if ($weak_self) {
954             $weak_self->throw_exception("DBI Exception: $_[0]");
955           }
956           else {
957             croak ("DBI Exception: $_[0]");
958           }
959       };
960       $dbh->{ShowErrorStatement} = 1;
961       $dbh->{RaiseError} = 1;
962       $dbh->{PrintError} = 0;
963     }
964   };
965
966   $DBI::connect_via = $old_connect_via if $old_connect_via;
967
968   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
969     if !$dbh || $@;
970
971   $self->_dbh_autocommit($dbh->{AutoCommit});
972
973   $dbh;
974 }
975
976 sub svp_begin {
977   my ($self, $name) = @_;
978
979   $name = $self->_svp_generate_name
980     unless defined $name;
981
982   $self->throw_exception ("You can't use savepoints outside a transaction")
983     if $self->{transaction_depth} == 0;
984
985   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
986     unless $self->can('_svp_begin');
987
988   push @{ $self->{savepoints} }, $name;
989
990   $self->debugobj->svp_begin($name) if $self->debug;
991
992   return $self->_svp_begin($name);
993 }
994
995 sub svp_release {
996   my ($self, $name) = @_;
997
998   $self->throw_exception ("You can't use savepoints outside a transaction")
999     if $self->{transaction_depth} == 0;
1000
1001   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1002     unless $self->can('_svp_release');
1003
1004   if (defined $name) {
1005     $self->throw_exception ("Savepoint '$name' does not exist")
1006       unless grep { $_ eq $name } @{ $self->{savepoints} };
1007
1008     # Dig through the stack until we find the one we are releasing.  This keeps
1009     # the stack up to date.
1010     my $svp;
1011
1012     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1013   } else {
1014     $name = pop @{ $self->{savepoints} };
1015   }
1016
1017   $self->debugobj->svp_release($name) if $self->debug;
1018
1019   return $self->_svp_release($name);
1020 }
1021
1022 sub svp_rollback {
1023   my ($self, $name) = @_;
1024
1025   $self->throw_exception ("You can't use savepoints outside a transaction")
1026     if $self->{transaction_depth} == 0;
1027
1028   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1029     unless $self->can('_svp_rollback');
1030
1031   if (defined $name) {
1032       # If they passed us a name, verify that it exists in the stack
1033       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1034           $self->throw_exception("Savepoint '$name' does not exist!");
1035       }
1036
1037       # Dig through the stack until we find the one we are releasing.  This keeps
1038       # the stack up to date.
1039       while(my $s = pop(@{ $self->{savepoints} })) {
1040           last if($s eq $name);
1041       }
1042       # Add the savepoint back to the stack, as a rollback doesn't remove the
1043       # named savepoint, only everything after it.
1044       push(@{ $self->{savepoints} }, $name);
1045   } else {
1046       # We'll assume they want to rollback to the last savepoint
1047       $name = $self->{savepoints}->[-1];
1048   }
1049
1050   $self->debugobj->svp_rollback($name) if $self->debug;
1051
1052   return $self->_svp_rollback($name);
1053 }
1054
1055 sub _svp_generate_name {
1056     my ($self) = @_;
1057
1058     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1059 }
1060
1061 sub txn_begin {
1062   my $self = shift;
1063   if($self->{transaction_depth} == 0) {
1064     $self->debugobj->txn_begin()
1065       if $self->debug;
1066
1067     # being here implies we have AutoCommit => 1
1068     # if the user is utilizing txn_do - good for
1069     # him, otherwise we need to ensure that the
1070     # $dbh is healthy on BEGIN
1071     my $dbh_method = $self->{_in_dbh_do} ? '_dbh' : 'dbh';
1072     $self->$dbh_method->begin_work;
1073
1074   } elsif ($self->auto_savepoint) {
1075     $self->svp_begin;
1076   }
1077   $self->{transaction_depth}++;
1078 }
1079
1080 sub txn_commit {
1081   my $self = shift;
1082   if ($self->{transaction_depth} == 1) {
1083     my $dbh = $self->_dbh;
1084     $self->debugobj->txn_commit()
1085       if ($self->debug);
1086     $dbh->commit;
1087     $self->{transaction_depth} = 0
1088       if $self->_dbh_autocommit;
1089   }
1090   elsif($self->{transaction_depth} > 1) {
1091     $self->{transaction_depth}--;
1092     $self->svp_release
1093       if $self->auto_savepoint;
1094   }
1095 }
1096
1097 sub txn_rollback {
1098   my $self = shift;
1099   my $dbh = $self->_dbh;
1100   eval {
1101     if ($self->{transaction_depth} == 1) {
1102       $self->debugobj->txn_rollback()
1103         if ($self->debug);
1104       $self->{transaction_depth} = 0
1105         if $self->_dbh_autocommit;
1106       $dbh->rollback;
1107     }
1108     elsif($self->{transaction_depth} > 1) {
1109       $self->{transaction_depth}--;
1110       if ($self->auto_savepoint) {
1111         $self->svp_rollback;
1112         $self->svp_release;
1113       }
1114     }
1115     else {
1116       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1117     }
1118   };
1119   if ($@) {
1120     my $error = $@;
1121     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1122     $error =~ /$exception_class/ and $self->throw_exception($error);
1123     # ensure that a failed rollback resets the transaction depth
1124     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1125     $self->throw_exception($error);
1126   }
1127 }
1128
1129 # This used to be the top-half of _execute.  It was split out to make it
1130 #  easier to override in NoBindVars without duping the rest.  It takes up
1131 #  all of _execute's args, and emits $sql, @bind.
1132 sub _prep_for_execute {
1133   my ($self, $op, $extra_bind, $ident, $args) = @_;
1134
1135   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1136     $ident = $ident->from();
1137   }
1138
1139   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1140
1141   unshift(@bind,
1142     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1143       if $extra_bind;
1144   return ($sql, \@bind);
1145 }
1146
1147
1148 sub _fix_bind_params {
1149     my ($self, @bind) = @_;
1150
1151     ### Turn @bind from something like this:
1152     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1153     ### to this:
1154     ###   ( "'1'", "'1'", "'3'" )
1155     return
1156         map {
1157             if ( defined( $_ && $_->[1] ) ) {
1158                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1159             }
1160             else { q{'NULL'}; }
1161         } @bind;
1162 }
1163
1164 sub _query_start {
1165     my ( $self, $sql, @bind ) = @_;
1166
1167     if ( $self->debug ) {
1168         @bind = $self->_fix_bind_params(@bind);
1169
1170         $self->debugobj->query_start( $sql, @bind );
1171     }
1172 }
1173
1174 sub _query_end {
1175     my ( $self, $sql, @bind ) = @_;
1176
1177     if ( $self->debug ) {
1178         @bind = $self->_fix_bind_params(@bind);
1179         $self->debugobj->query_end( $sql, @bind );
1180     }
1181 }
1182
1183 sub _dbh_execute {
1184   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1185
1186   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1187
1188   $self->_query_start( $sql, @$bind );
1189
1190   my $sth = $self->sth($sql,$op);
1191
1192   my $placeholder_index = 1;
1193
1194   foreach my $bound (@$bind) {
1195     my $attributes = {};
1196     my($column_name, @data) = @$bound;
1197
1198     if ($bind_attributes) {
1199       $attributes = $bind_attributes->{$column_name}
1200       if defined $bind_attributes->{$column_name};
1201     }
1202
1203     foreach my $data (@data) {
1204       my $ref = ref $data;
1205       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1206
1207       $sth->bind_param($placeholder_index, $data, $attributes);
1208       $placeholder_index++;
1209     }
1210   }
1211
1212   # Can this fail without throwing an exception anyways???
1213   my $rv = $sth->execute();
1214   $self->throw_exception($sth->errstr) if !$rv;
1215
1216   $self->_query_end( $sql, @$bind );
1217
1218   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1219 }
1220
1221 sub _execute {
1222     my $self = shift;
1223     $self->dbh_do('_dbh_execute', @_)
1224 }
1225
1226 sub insert {
1227   my ($self, $source, $to_insert) = @_;
1228
1229 # redispatch to insert method of storage we reblessed into, if necessary
1230   if (not $self->_driver_determined) {
1231     $self->_determine_driver;
1232     goto $self->can('insert');
1233   }
1234
1235   my $ident = $source->from;
1236   my $bind_attributes = $self->source_bind_attributes($source);
1237
1238   my $updated_cols = {};
1239
1240   foreach my $col ( $source->columns ) {
1241     if ( !defined $to_insert->{$col} ) {
1242       my $col_info = $source->column_info($col);
1243
1244       if ( $col_info->{auto_nextval} ) {
1245         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1246           'nextval',
1247           $col_info->{sequence} ||
1248             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1249         );
1250       }
1251     }
1252   }
1253
1254   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1255
1256   return $updated_cols;
1257 }
1258
1259 ## Still not quite perfect, and EXPERIMENTAL
1260 ## Currently it is assumed that all values passed will be "normal", i.e. not
1261 ## scalar refs, or at least, all the same type as the first set, the statement is
1262 ## only prepped once.
1263 sub insert_bulk {
1264   my ($self, $source, $cols, $data) = @_;
1265   my %colvalues;
1266   my $table = $source->from;
1267   @colvalues{@$cols} = (0..$#$cols);
1268   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1269
1270   $self->_determine_driver;
1271
1272   $self->_query_start( $sql, @bind );
1273   my $sth = $self->sth($sql);
1274
1275 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1276
1277   ## This must be an arrayref, else nothing works!
1278   my $tuple_status = [];
1279
1280   ## Get the bind_attributes, if any exist
1281   my $bind_attributes = $self->source_bind_attributes($source);
1282
1283   ## Bind the values and execute
1284   my $placeholder_index = 1;
1285
1286   foreach my $bound (@bind) {
1287
1288     my $attributes = {};
1289     my ($column_name, $data_index) = @$bound;
1290
1291     if( $bind_attributes ) {
1292       $attributes = $bind_attributes->{$column_name}
1293       if defined $bind_attributes->{$column_name};
1294     }
1295
1296     my @data = map { $_->[$data_index] } @$data;
1297
1298     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1299     $placeholder_index++;
1300   }
1301   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1302   if (my $err = $@) {
1303     my $i = 0;
1304     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1305
1306     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1307       if ($i > $#$tuple_status);
1308
1309     require Data::Dumper;
1310     local $Data::Dumper::Terse = 1;
1311     local $Data::Dumper::Indent = 1;
1312     local $Data::Dumper::Useqq = 1;
1313     local $Data::Dumper::Quotekeys = 0;
1314
1315     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1316       $tuple_status->[$i][1],
1317       Data::Dumper::Dumper(
1318         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1319       ),
1320     );
1321   }
1322   $self->throw_exception($sth->errstr) if !$rv;
1323
1324   $self->_query_end( $sql, @bind );
1325   return (wantarray ? ($rv, $sth, @bind) : $rv);
1326 }
1327
1328 sub update {
1329   my $self = shift @_;
1330   my $source = shift @_;
1331   $self->_determine_driver;
1332   my $bind_attributes = $self->source_bind_attributes($source);
1333
1334   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1335 }
1336
1337
1338 sub delete {
1339   my $self = shift @_;
1340   my $source = shift @_;
1341   $self->_determine_driver;
1342   my $bind_attrs = $self->source_bind_attributes($source);
1343
1344   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1345 }
1346
1347 # We were sent here because the $rs contains a complex search
1348 # which will require a subquery to select the correct rows
1349 # (i.e. joined or limited resultsets)
1350 #
1351 # Genarating a single PK column subquery is trivial and supported
1352 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1353 # Look at _multipk_update_delete()
1354 sub _subq_update_delete {
1355   my $self = shift;
1356   my ($rs, $op, $values) = @_;
1357
1358   my $rsrc = $rs->result_source;
1359
1360   # we already check this, but double check naively just in case. Should be removed soon
1361   my $sel = $rs->_resolved_attrs->{select};
1362   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1363   my @pcols = $rsrc->primary_columns;
1364   if (@$sel != @pcols) {
1365     $self->throw_exception (
1366       'Subquery update/delete can not be called on resultsets selecting a'
1367      .' number of columns different than the number of primary keys'
1368     );
1369   }
1370
1371   if (@pcols == 1) {
1372     return $self->$op (
1373       $rsrc,
1374       $op eq 'update' ? $values : (),
1375       { $pcols[0] => { -in => $rs->as_query } },
1376     );
1377   }
1378
1379   else {
1380     return $self->_multipk_update_delete (@_);
1381   }
1382 }
1383
1384 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1385 # resultset update/delete involving subqueries. So by default resort
1386 # to simple (and inefficient) delete_all style per-row opearations,
1387 # while allowing specific storages to override this with a faster
1388 # implementation.
1389 #
1390 sub _multipk_update_delete {
1391   return shift->_per_row_update_delete (@_);
1392 }
1393
1394 # This is the default loop used to delete/update rows for multi PK
1395 # resultsets, and used by mysql exclusively (because it can't do anything
1396 # else).
1397 #
1398 # We do not use $row->$op style queries, because resultset update/delete
1399 # is not expected to cascade (this is what delete_all/update_all is for).
1400 #
1401 # There should be no race conditions as the entire operation is rolled
1402 # in a transaction.
1403 #
1404 sub _per_row_update_delete {
1405   my $self = shift;
1406   my ($rs, $op, $values) = @_;
1407
1408   my $rsrc = $rs->result_source;
1409   my @pcols = $rsrc->primary_columns;
1410
1411   my $guard = $self->txn_scope_guard;
1412
1413   # emulate the return value of $sth->execute for non-selects
1414   my $row_cnt = '0E0';
1415
1416   my $subrs_cur = $rs->cursor;
1417   while (my @pks = $subrs_cur->next) {
1418
1419     my $cond;
1420     for my $i (0.. $#pcols) {
1421       $cond->{$pcols[$i]} = $pks[$i];
1422     }
1423
1424     $self->$op (
1425       $rsrc,
1426       $op eq 'update' ? $values : (),
1427       $cond,
1428     );
1429
1430     $row_cnt++;
1431   }
1432
1433   $guard->commit;
1434
1435   return $row_cnt;
1436 }
1437
1438 sub _select {
1439   my $self = shift;
1440
1441   # localization is neccessary as
1442   # 1) there is no infrastructure to pass this around before SQLA2
1443   # 2) _select_args sets it and _prep_for_execute consumes it
1444   my $sql_maker = $self->sql_maker;
1445   local $sql_maker->{_dbic_rs_attrs};
1446
1447   return $self->_execute($self->_select_args(@_));
1448 }
1449
1450 sub _select_args_to_query {
1451   my $self = shift;
1452
1453   # localization is neccessary as
1454   # 1) there is no infrastructure to pass this around before SQLA2
1455   # 2) _select_args sets it and _prep_for_execute consumes it
1456   my $sql_maker = $self->sql_maker;
1457   local $sql_maker->{_dbic_rs_attrs};
1458
1459   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1460   #  = $self->_select_args($ident, $select, $cond, $attrs);
1461   my ($op, $bind, $ident, $bind_attrs, @args) =
1462     $self->_select_args(@_);
1463
1464   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1465   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1466   $prepared_bind ||= [];
1467
1468   return wantarray
1469     ? ($sql, $prepared_bind, $bind_attrs)
1470     : \[ "($sql)", @$prepared_bind ]
1471   ;
1472 }
1473
1474 sub _select_args {
1475   my ($self, $ident, $select, $where, $attrs) = @_;
1476
1477   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1478
1479   my $sql_maker = $self->sql_maker;
1480   $sql_maker->{_dbic_rs_attrs} = {
1481     %$attrs,
1482     select => $select,
1483     from => $ident,
1484     where => $where,
1485     $rs_alias
1486       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1487       : ()
1488     ,
1489   };
1490
1491   # calculate bind_attrs before possible $ident mangling
1492   my $bind_attrs = {};
1493   for my $alias (keys %$alias2source) {
1494     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1495     for my $col (keys %$bindtypes) {
1496
1497       my $fqcn = join ('.', $alias, $col);
1498       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1499
1500       # Unqialified column names are nice, but at the same time can be
1501       # rather ambiguous. What we do here is basically go along with
1502       # the loop, adding an unqualified column slot to $bind_attrs,
1503       # alongside the fully qualified name. As soon as we encounter
1504       # another column by that name (which would imply another table)
1505       # we unset the unqualified slot and never add any info to it
1506       # to avoid erroneous type binding. If this happens the users
1507       # only choice will be to fully qualify his column name
1508
1509       if (exists $bind_attrs->{$col}) {
1510         $bind_attrs->{$col} = {};
1511       }
1512       else {
1513         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1514       }
1515     }
1516   }
1517
1518   # adjust limits
1519   if (
1520     $attrs->{software_limit}
1521       ||
1522     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1523   ) {
1524     $attrs->{software_limit} = 1;
1525   }
1526   else {
1527     $self->throw_exception("rows attribute must be positive if present")
1528       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1529
1530     # MySQL actually recommends this approach.  I cringe.
1531     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1532   }
1533
1534   my @limit;
1535
1536   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1537   # otherwise delegate the limiting to the storage, unless software limit was requested
1538   if (
1539     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1540        ||
1541     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1542       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1543   ) {
1544     ($ident, $select, $where, $attrs)
1545       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1546   }
1547   elsif (! $attrs->{software_limit} ) {
1548     push @limit, $attrs->{rows}, $attrs->{offset};
1549   }
1550
1551 ###
1552   # This would be the point to deflate anything found in $where
1553   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1554   # expect a row object. And all we have is a resultsource (it is trivial
1555   # to extract deflator coderefs via $alias2source above).
1556   #
1557   # I don't see a way forward other than changing the way deflators are
1558   # invoked, and that's just bad...
1559 ###
1560
1561   my $order = { map
1562     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1563     (qw/order_by group_by having/ )
1564   };
1565
1566   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1567 }
1568
1569 #
1570 # This is the code producing joined subqueries like:
1571 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1572 #
1573 sub _adjust_select_args_for_complex_prefetch {
1574   my ($self, $from, $select, $where, $attrs) = @_;
1575
1576   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1577     if (ref $from ne 'ARRAY');
1578
1579   # copies for mangling
1580   $from = [ @$from ];
1581   $select = [ @$select ];
1582   $attrs = { %$attrs };
1583
1584   # separate attributes
1585   my $sub_attrs = { %$attrs };
1586   delete $attrs->{$_} for qw/where bind rows offset group_by having/;
1587   delete $sub_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
1588
1589   my $select_root_alias = $attrs->{alias};
1590   my $sql_maker = $self->sql_maker;
1591
1592   # create subquery select list - consider only stuff *not* brought in by the prefetch
1593   my $sub_select = [];
1594   my $sub_group_by;
1595   for my $i (0 .. @{$attrs->{select}} - @{$attrs->{_prefetch_select}} - 1) {
1596     my $sel = $attrs->{select}[$i];
1597
1598     # alias any functions to the dbic-side 'as' label
1599     # adjust the outer select accordingly
1600     if (ref $sel eq 'HASH' && !$sel->{-select}) {
1601       $sel = { -select => $sel, -as => $attrs->{as}[$i] };
1602       $select->[$i] = join ('.', $attrs->{alias}, ($attrs->{as}[$i] || "select_$i") );
1603     }
1604
1605     push @$sub_select, $sel;
1606   }
1607
1608   # bring over all non-collapse-induced order_by into the inner query (if any)
1609   # the outer one will have to keep them all
1610   delete $sub_attrs->{order_by};
1611   if (my $ord_cnt = @{$attrs->{order_by}} - @{$attrs->{_collapse_order_by}} ) {
1612     $sub_attrs->{order_by} = [
1613       @{$attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1614     ];
1615   }
1616
1617   # mangle {from}, keep in mind that $from is "headless" from here on
1618   my $join_root = shift @$from;
1619
1620   my %inner_joins;
1621   my %join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1622
1623   # in complex search_related chains $select_root_alias may *not* be
1624   # 'me' so always include it in the inner join
1625   $inner_joins{$select_root_alias} = 1 if ($join_root->{-alias} ne $select_root_alias);
1626
1627
1628   # decide which parts of the join will remain on the inside
1629   #
1630   # this is not a very viable optimisation, but it was written
1631   # before I realised this, so might as well remain. We can throw
1632   # away _any_ branches of the join tree that are:
1633   # 1) not mentioned in the condition/order
1634   # 2) left-join leaves (or left-join leaf chains)
1635   # Most of the join conditions will not satisfy this, but for real
1636   # complex queries some might, and we might make some RDBMS happy.
1637   #
1638   #
1639   # since we do not have introspectable SQLA, we fall back to ugly
1640   # scanning of raw SQL for WHERE, and for pieces of ORDER BY
1641   # in order to determine what goes into %inner_joins
1642   # It may not be very efficient, but it's a reasonable stop-gap
1643   {
1644     # produce stuff unquoted, so it can be scanned
1645     local $sql_maker->{quote_char};
1646     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1647     $sep = "\Q$sep\E";
1648
1649     my @order_by = (map
1650       { ref $_ ? $_->[0] : $_ }
1651       $sql_maker->_order_by_chunks ($sub_attrs->{order_by})
1652     );
1653
1654     my $where_sql = $sql_maker->where ($where);
1655     my $select_sql = $sql_maker->_recurse_fields ($sub_select);
1656
1657     # sort needed joins
1658     for my $alias (keys %join_info) {
1659
1660       # any table alias found on a column name in where or order_by
1661       # gets included in %inner_joins
1662       # Also any parent joins that are needed to reach this particular alias
1663       for my $piece ($select_sql, $where_sql, @order_by ) {
1664         if ($piece =~ /\b $alias $sep/x) {
1665           $inner_joins{$alias} = 1;
1666         }
1667       }
1668     }
1669   }
1670
1671   # scan for non-leaf/non-left joins and mark as needed
1672   # also mark all ancestor joins that are needed to reach this particular alias
1673   # (e.g.  join => { cds => 'tracks' } - tracks will bring cds too )
1674   #
1675   # traverse by the size of the -join_path i.e. reverse depth first
1676   for my $alias (sort { @{$join_info{$b}{-join_path}} <=> @{$join_info{$a}{-join_path}} } (keys %join_info) ) {
1677
1678     my $j = $join_info{$alias};
1679     $inner_joins{$alias} = 1 if (! $j->{-join_type} || ($j->{-join_type} !~ /^left$/i) );
1680
1681     if ($inner_joins{$alias}) {
1682       $inner_joins{$_} = 1 for (@{$j->{-join_path}});
1683     }
1684   }
1685
1686   # construct the inner $from for the subquery
1687   my $inner_from = [ $join_root ];
1688   for my $j (@$from) {
1689     push @$inner_from, $j if $inner_joins{$j->[0]{-alias}};
1690   }
1691
1692   # if a multi-type join was needed in the subquery ("multi" is indicated by
1693   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1694   unless ($sub_attrs->{group_by}) {
1695     for my $alias (keys %inner_joins) {
1696
1697       # the dot comes from some weirdness in collapse
1698       # remove after the rewrite
1699       if ($attrs->{collapse}{".$alias"}) {
1700         $sub_attrs->{group_by} ||= $sub_select;
1701         last;
1702       }
1703     }
1704   }
1705
1706   # generate the subquery
1707   my $subq = $self->_select_args_to_query (
1708     $inner_from,
1709     $sub_select,
1710     $where,
1711     $sub_attrs
1712   );
1713   my $subq_joinspec = {
1714     -alias => $select_root_alias,
1715     -source_handle => $join_root->{-source_handle},
1716     $select_root_alias => $subq,
1717   };
1718
1719   # Generate a new from (really just replace the join slot with the subquery)
1720   # Before we would start the outer chain from the subquery itself (i.e.
1721   # SELECT ... FROM (SELECT ... ) alias JOIN ..., but this turned out to be
1722   # a bad idea for search_related, as the root of the chain was effectively
1723   # lost (i.e. $artist_rs->search_related ('cds'... ) would result in alias
1724   # of 'cds', which would prevent from doing things like order_by artist.*)
1725   # See t/prefetch/via_search_related.t for a better idea
1726   my @outer_from;
1727   if ($join_root->{-alias} eq $select_root_alias) { # just swap the root part and we're done
1728     @outer_from = (
1729       $subq_joinspec,
1730       @$from,
1731     )
1732   }
1733   else {  # this is trickier
1734     @outer_from = ($join_root);
1735
1736     for my $j (@$from) {
1737       if ($j->[0]{-alias} eq $select_root_alias) {
1738         push @outer_from, [
1739           $subq_joinspec,
1740           @{$j}[1 .. $#$j],
1741         ];
1742       }
1743       else {
1744         push @outer_from, $j;
1745       }
1746     }
1747   }
1748
1749   # This is totally horrific - the $where ends up in both the inner and outer query
1750   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
1751   # then if where conditions apply to the *right* side of the prefetch, you may have
1752   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
1753   # the outer select to exclude joins you didin't want in the first place
1754   #
1755   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1756   return (\@outer_from, $select, $where, $attrs);
1757 }
1758
1759 sub _resolve_ident_sources {
1760   my ($self, $ident) = @_;
1761
1762   my $alias2source = {};
1763   my $rs_alias;
1764
1765   # the reason this is so contrived is that $ident may be a {from}
1766   # structure, specifying multiple tables to join
1767   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1768     # this is compat mode for insert/update/delete which do not deal with aliases
1769     $alias2source->{me} = $ident;
1770     $rs_alias = 'me';
1771   }
1772   elsif (ref $ident eq 'ARRAY') {
1773
1774     for (@$ident) {
1775       my $tabinfo;
1776       if (ref $_ eq 'HASH') {
1777         $tabinfo = $_;
1778         $rs_alias = $tabinfo->{-alias};
1779       }
1780       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1781         $tabinfo = $_->[0];
1782       }
1783
1784       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1785         if ($tabinfo->{-source_handle});
1786     }
1787   }
1788
1789   return ($alias2source, $rs_alias);
1790 }
1791
1792 # Takes $ident, \@column_names
1793 #
1794 # returns { $column_name => \%column_info, ... }
1795 # also note: this adds -result_source => $rsrc to the column info
1796 #
1797 # usage:
1798 #   my $col_sources = $self->_resolve_column_info($ident, @column_names);
1799 sub _resolve_column_info {
1800   my ($self, $ident, $colnames) = @_;
1801   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
1802
1803   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1804   $sep = "\Q$sep\E";
1805
1806   my (%return, %seen_cols);
1807
1808   # compile a global list of column names, to be able to properly
1809   # disambiguate unqualified column names (if at all possible)
1810   for my $alias (keys %$alias2src) {
1811     my $rsrc = $alias2src->{$alias};
1812     for my $colname ($rsrc->columns) {
1813       push @{$seen_cols{$colname}}, $alias;
1814     }
1815   }
1816
1817   COLUMN:
1818   foreach my $col (@$colnames) {
1819     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
1820
1821     unless ($alias) {
1822       # see if the column was seen exactly once (so we know which rsrc it came from)
1823       if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
1824         $alias = $seen_cols{$colname}[0];
1825       }
1826       else {
1827         next COLUMN;
1828       }
1829     }
1830
1831     my $rsrc = $alias2src->{$alias};
1832     $return{$col} = $rsrc && {
1833       %{$rsrc->column_info($colname)},
1834       -result_source => $rsrc,
1835       -source_alias => $alias,
1836     };
1837   }
1838
1839   return \%return;
1840 }
1841
1842 # Returns a counting SELECT for a simple count
1843 # query. Abstracted so that a storage could override
1844 # this to { count => 'firstcol' } or whatever makes
1845 # sense as a performance optimization
1846 sub _count_select {
1847   #my ($self, $source, $rs_attrs) = @_;
1848   return { count => '*' };
1849 }
1850
1851 # Returns a SELECT which will end up in the subselect
1852 # There may or may not be a group_by, as the subquery
1853 # might have been called to accomodate a limit
1854 #
1855 # Most databases would be happy with whatever ends up
1856 # here, but some choke in various ways.
1857 #
1858 sub _subq_count_select {
1859   my ($self, $source, $rs_attrs) = @_;
1860   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1861
1862   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1863   return @pcols ? \@pcols : [ 1 ];
1864 }
1865
1866
1867 sub source_bind_attributes {
1868   my ($self, $source) = @_;
1869
1870   my $bind_attributes;
1871   foreach my $column ($source->columns) {
1872
1873     my $data_type = $source->column_info($column)->{data_type} || '';
1874     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1875      if $data_type;
1876   }
1877
1878   return $bind_attributes;
1879 }
1880
1881 =head2 select
1882
1883 =over 4
1884
1885 =item Arguments: $ident, $select, $condition, $attrs
1886
1887 =back
1888
1889 Handle a SQL select statement.
1890
1891 =cut
1892
1893 sub select {
1894   my $self = shift;
1895   my ($ident, $select, $condition, $attrs) = @_;
1896   return $self->cursor_class->new($self, \@_, $attrs);
1897 }
1898
1899 sub select_single {
1900   my $self = shift;
1901   my ($rv, $sth, @bind) = $self->_select(@_);
1902   my @row = $sth->fetchrow_array;
1903   my @nextrow = $sth->fetchrow_array if @row;
1904   if(@row && @nextrow) {
1905     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1906   }
1907   # Need to call finish() to work round broken DBDs
1908   $sth->finish();
1909   return @row;
1910 }
1911
1912 =head2 sth
1913
1914 =over 4
1915
1916 =item Arguments: $sql
1917
1918 =back
1919
1920 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1921
1922 =cut
1923
1924 sub _dbh_sth {
1925   my ($self, $dbh, $sql) = @_;
1926
1927   # 3 is the if_active parameter which avoids active sth re-use
1928   my $sth = $self->disable_sth_caching
1929     ? $dbh->prepare($sql)
1930     : $dbh->prepare_cached($sql, {}, 3);
1931
1932   # XXX You would think RaiseError would make this impossible,
1933   #  but apparently that's not true :(
1934   $self->throw_exception($dbh->errstr) if !$sth;
1935
1936   $sth;
1937 }
1938
1939 sub sth {
1940   my ($self, $sql) = @_;
1941   $self->dbh_do('_dbh_sth', $sql);
1942 }
1943
1944 sub _dbh_columns_info_for {
1945   my ($self, $dbh, $table) = @_;
1946
1947   if ($dbh->can('column_info')) {
1948     my %result;
1949     eval {
1950       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1951       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1952       $sth->execute();
1953       while ( my $info = $sth->fetchrow_hashref() ){
1954         my %column_info;
1955         $column_info{data_type}   = $info->{TYPE_NAME};
1956         $column_info{size}      = $info->{COLUMN_SIZE};
1957         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1958         $column_info{default_value} = $info->{COLUMN_DEF};
1959         my $col_name = $info->{COLUMN_NAME};
1960         $col_name =~ s/^\"(.*)\"$/$1/;
1961
1962         $result{$col_name} = \%column_info;
1963       }
1964     };
1965     return \%result if !$@ && scalar keys %result;
1966   }
1967
1968   my %result;
1969   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1970   $sth->execute;
1971   my @columns = @{$sth->{NAME_lc}};
1972   for my $i ( 0 .. $#columns ){
1973     my %column_info;
1974     $column_info{data_type} = $sth->{TYPE}->[$i];
1975     $column_info{size} = $sth->{PRECISION}->[$i];
1976     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1977
1978     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1979       $column_info{data_type} = $1;
1980       $column_info{size}    = $2;
1981     }
1982
1983     $result{$columns[$i]} = \%column_info;
1984   }
1985   $sth->finish;
1986
1987   foreach my $col (keys %result) {
1988     my $colinfo = $result{$col};
1989     my $type_num = $colinfo->{data_type};
1990     my $type_name;
1991     if(defined $type_num && $dbh->can('type_info')) {
1992       my $type_info = $dbh->type_info($type_num);
1993       $type_name = $type_info->{TYPE_NAME} if $type_info;
1994       $colinfo->{data_type} = $type_name if $type_name;
1995     }
1996   }
1997
1998   return \%result;
1999 }
2000
2001 sub columns_info_for {
2002   my ($self, $table) = @_;
2003   $self->dbh_do('_dbh_columns_info_for', $table);
2004 }
2005
2006 =head2 last_insert_id
2007
2008 Return the row id of the last insert.
2009
2010 =cut
2011
2012 sub _dbh_last_insert_id {
2013     # All Storage's need to register their own _dbh_last_insert_id
2014     # the old SQLite-based method was highly inappropriate
2015
2016     my $self = shift;
2017     my $class = ref $self;
2018     $self->throw_exception (<<EOE);
2019
2020 No _dbh_last_insert_id() method found in $class.
2021 Since the method of obtaining the autoincrement id of the last insert
2022 operation varies greatly between different databases, this method must be
2023 individually implemented for every storage class.
2024 EOE
2025 }
2026
2027 sub last_insert_id {
2028   my $self = shift;
2029   $self->dbh_do('_dbh_last_insert_id', @_);
2030 }
2031
2032 =head2 sqlt_type
2033
2034 Returns the database driver name.
2035
2036 =cut
2037
2038 sub sqlt_type { shift->_get_dbh->{Driver}->{Name} }
2039
2040 =head2 bind_attribute_by_data_type
2041
2042 Given a datatype from column info, returns a database specific bind
2043 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2044 let the database planner just handle it.
2045
2046 Generally only needed for special case column types, like bytea in postgres.
2047
2048 =cut
2049
2050 sub bind_attribute_by_data_type {
2051     return;
2052 }
2053
2054 =head2 is_datatype_numeric
2055
2056 Given a datatype from column_info, returns a boolean value indicating if
2057 the current RDBMS considers it a numeric value. This controls how
2058 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2059 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2060 be performed instead of the usual C<eq>.
2061
2062 =cut
2063
2064 sub is_datatype_numeric {
2065   my ($self, $dt) = @_;
2066
2067   return 0 unless $dt;
2068
2069   return $dt =~ /^ (?:
2070     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2071   ) $/ix;
2072 }
2073
2074
2075 =head2 create_ddl_dir (EXPERIMENTAL)
2076
2077 =over 4
2078
2079 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2080
2081 =back
2082
2083 Creates a SQL file based on the Schema, for each of the specified
2084 database engines in C<\@databases> in the given directory.
2085 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2086
2087 Given a previous version number, this will also create a file containing
2088 the ALTER TABLE statements to transform the previous schema into the
2089 current one. Note that these statements may contain C<DROP TABLE> or
2090 C<DROP COLUMN> statements that can potentially destroy data.
2091
2092 The file names are created using the C<ddl_filename> method below, please
2093 override this method in your schema if you would like a different file
2094 name format. For the ALTER file, the same format is used, replacing
2095 $version in the name with "$preversion-$version".
2096
2097 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2098 The most common value for this would be C<< { add_drop_table => 1 } >>
2099 to have the SQL produced include a C<DROP TABLE> statement for each table
2100 created. For quoting purposes supply C<quote_table_names> and
2101 C<quote_field_names>.
2102
2103 If no arguments are passed, then the following default values are assumed:
2104
2105 =over 4
2106
2107 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2108
2109 =item version    - $schema->schema_version
2110
2111 =item directory  - './'
2112
2113 =item preversion - <none>
2114
2115 =back
2116
2117 By default, C<\%sqlt_args> will have
2118
2119  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2120
2121 merged with the hash passed in. To disable any of those features, pass in a
2122 hashref like the following
2123
2124  { ignore_constraint_names => 0, # ... other options }
2125
2126
2127 Note that this feature is currently EXPERIMENTAL and may not work correctly
2128 across all databases, or fully handle complex relationships.
2129
2130 WARNING: Please check all SQL files created, before applying them.
2131
2132 =cut
2133
2134 sub create_ddl_dir {
2135   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2136
2137   if(!$dir || !-d $dir) {
2138     carp "No directory given, using ./\n";
2139     $dir = "./";
2140   }
2141   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2142   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2143
2144   my $schema_version = $schema->schema_version || '1.x';
2145   $version ||= $schema_version;
2146
2147   $sqltargs = {
2148     add_drop_table => 1,
2149     ignore_constraint_names => 1,
2150     ignore_index_names => 1,
2151     %{$sqltargs || {}}
2152   };
2153
2154   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
2155       . $self->_check_sqlt_message . q{'})
2156           if !$self->_check_sqlt_version;
2157
2158   my $sqlt = SQL::Translator->new( $sqltargs );
2159
2160   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2161   my $sqlt_schema = $sqlt->translate({ data => $schema })
2162     or $self->throw_exception ($sqlt->error);
2163
2164   foreach my $db (@$databases) {
2165     $sqlt->reset();
2166     $sqlt->{schema} = $sqlt_schema;
2167     $sqlt->producer($db);
2168
2169     my $file;
2170     my $filename = $schema->ddl_filename($db, $version, $dir);
2171     if (-e $filename && ($version eq $schema_version )) {
2172       # if we are dumping the current version, overwrite the DDL
2173       carp "Overwriting existing DDL file - $filename";
2174       unlink($filename);
2175     }
2176
2177     my $output = $sqlt->translate;
2178     if(!$output) {
2179       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2180       next;
2181     }
2182     if(!open($file, ">$filename")) {
2183       $self->throw_exception("Can't open $filename for writing ($!)");
2184       next;
2185     }
2186     print $file $output;
2187     close($file);
2188
2189     next unless ($preversion);
2190
2191     require SQL::Translator::Diff;
2192
2193     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2194     if(!-e $prefilename) {
2195       carp("No previous schema file found ($prefilename)");
2196       next;
2197     }
2198
2199     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2200     if(-e $difffile) {
2201       carp("Overwriting existing diff file - $difffile");
2202       unlink($difffile);
2203     }
2204
2205     my $source_schema;
2206     {
2207       my $t = SQL::Translator->new($sqltargs);
2208       $t->debug( 0 );
2209       $t->trace( 0 );
2210
2211       $t->parser( $db )
2212         or $self->throw_exception ($t->error);
2213
2214       my $out = $t->translate( $prefilename )
2215         or $self->throw_exception ($t->error);
2216
2217       $source_schema = $t->schema;
2218
2219       $source_schema->name( $prefilename )
2220         unless ( $source_schema->name );
2221     }
2222
2223     # The "new" style of producers have sane normalization and can support
2224     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2225     # And we have to diff parsed SQL against parsed SQL.
2226     my $dest_schema = $sqlt_schema;
2227
2228     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2229       my $t = SQL::Translator->new($sqltargs);
2230       $t->debug( 0 );
2231       $t->trace( 0 );
2232
2233       $t->parser( $db )
2234         or $self->throw_exception ($t->error);
2235
2236       my $out = $t->translate( $filename )
2237         or $self->throw_exception ($t->error);
2238
2239       $dest_schema = $t->schema;
2240
2241       $dest_schema->name( $filename )
2242         unless $dest_schema->name;
2243     }
2244
2245     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2246                                                   $dest_schema,   $db,
2247                                                   $sqltargs
2248                                                  );
2249     if(!open $file, ">$difffile") {
2250       $self->throw_exception("Can't write to $difffile ($!)");
2251       next;
2252     }
2253     print $file $diff;
2254     close($file);
2255   }
2256 }
2257
2258 =head2 deployment_statements
2259
2260 =over 4
2261
2262 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2263
2264 =back
2265
2266 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2267
2268 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2269 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2270
2271 C<$directory> is used to return statements from files in a previously created
2272 L</create_ddl_dir> directory and is optional. The filenames are constructed
2273 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2274
2275 If no C<$directory> is specified then the statements are constructed on the
2276 fly using L<SQL::Translator> and C<$version> is ignored.
2277
2278 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2279
2280 =cut
2281
2282 sub deployment_statements {
2283   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2284   $type ||= $self->sqlt_type;
2285   $version ||= $schema->schema_version || '1.x';
2286   $dir ||= './';
2287   my $filename = $schema->ddl_filename($type, $version, $dir);
2288   if(-f $filename)
2289   {
2290       my $file;
2291       open($file, "<$filename")
2292         or $self->throw_exception("Can't open $filename ($!)");
2293       my @rows = <$file>;
2294       close($file);
2295       return join('', @rows);
2296   }
2297
2298   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
2299       . $self->_check_sqlt_message . q{'})
2300           if !$self->_check_sqlt_version;
2301
2302   require SQL::Translator::Parser::DBIx::Class;
2303   eval qq{use SQL::Translator::Producer::${type}};
2304   $self->throw_exception($@) if $@;
2305
2306   # sources needs to be a parser arg, but for simplicty allow at top level
2307   # coming in
2308   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2309       if exists $sqltargs->{sources};
2310
2311   my $tr = SQL::Translator->new(%$sqltargs);
2312   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
2313   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
2314 }
2315
2316 sub deploy {
2317   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2318   my $deploy = sub {
2319     my $line = shift;
2320     return if($line =~ /^--/);
2321     return if(!$line);
2322     # next if($line =~ /^DROP/m);
2323     return if($line =~ /^BEGIN TRANSACTION/m);
2324     return if($line =~ /^COMMIT/m);
2325     return if $line =~ /^\s+$/; # skip whitespace only
2326     $self->_query_start($line);
2327     eval {
2328       # do a dbh_do cycle here, as we need some error checking in
2329       # place (even though we will ignore errors)
2330       $self->dbh_do (sub { $_[1]->do($line) });
2331     };
2332     if ($@) {
2333       carp qq{$@ (running "${line}")};
2334     }
2335     $self->_query_end($line);
2336   };
2337   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2338   if (@statements > 1) {
2339     foreach my $statement (@statements) {
2340       $deploy->( $statement );
2341     }
2342   }
2343   elsif (@statements == 1) {
2344     foreach my $line ( split(";\n", $statements[0])) {
2345       $deploy->( $line );
2346     }
2347   }
2348 }
2349
2350 =head2 datetime_parser
2351
2352 Returns the datetime parser class
2353
2354 =cut
2355
2356 sub datetime_parser {
2357   my $self = shift;
2358   return $self->{datetime_parser} ||= do {
2359     $self->_populate_dbh unless $self->_dbh;
2360     $self->build_datetime_parser(@_);
2361   };
2362 }
2363
2364 =head2 datetime_parser_type
2365
2366 Defines (returns) the datetime parser class - currently hardwired to
2367 L<DateTime::Format::MySQL>
2368
2369 =cut
2370
2371 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2372
2373 =head2 build_datetime_parser
2374
2375 See L</datetime_parser>
2376
2377 =cut
2378
2379 sub build_datetime_parser {
2380   my $self = shift;
2381   my $type = $self->datetime_parser_type(@_);
2382   eval "use ${type}";
2383   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2384   return $type;
2385 }
2386
2387 {
2388     my $_check_sqlt_version; # private
2389     my $_check_sqlt_message; # private
2390     sub _check_sqlt_version {
2391         return $_check_sqlt_version if defined $_check_sqlt_version;
2392         eval 'use SQL::Translator "0.09003"';
2393         $_check_sqlt_message = $@ || '';
2394         $_check_sqlt_version = !$@;
2395     }
2396
2397     sub _check_sqlt_message {
2398         _check_sqlt_version if !defined $_check_sqlt_message;
2399         $_check_sqlt_message;
2400     }
2401 }
2402
2403 =head2 is_replicating
2404
2405 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2406 replicate from a master database.  Default is undef, which is the result
2407 returned by databases that don't support replication.
2408
2409 =cut
2410
2411 sub is_replicating {
2412     return;
2413
2414 }
2415
2416 =head2 lag_behind_master
2417
2418 Returns a number that represents a certain amount of lag behind a master db
2419 when a given storage is replicating.  The number is database dependent, but
2420 starts at zero and increases with the amount of lag. Default in undef
2421
2422 =cut
2423
2424 sub lag_behind_master {
2425     return;
2426 }
2427
2428 sub DESTROY {
2429   my $self = shift;
2430   return if !$self->_dbh;
2431   $self->_verify_pid;
2432   $self->_dbh(undef);
2433 }
2434
2435 1;
2436
2437 =head1 USAGE NOTES
2438
2439 =head2 DBIx::Class and AutoCommit
2440
2441 DBIx::Class can do some wonderful magic with handling exceptions,
2442 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2443 (the default) combined with C<txn_do> for transaction support.
2444
2445 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2446 in an assumed transaction between commits, and you're telling us you'd
2447 like to manage that manually.  A lot of the magic protections offered by
2448 this module will go away.  We can't protect you from exceptions due to database
2449 disconnects because we don't know anything about how to restart your
2450 transactions.  You're on your own for handling all sorts of exceptional
2451 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2452 be with raw DBI.
2453
2454
2455 =head1 AUTHORS
2456
2457 Matt S. Trout <mst@shadowcatsystems.co.uk>
2458
2459 Andy Grundman <andy@hybridized.org>
2460
2461 =head1 LICENSE
2462
2463 You may distribute this code under the same terms as Perl itself.
2464
2465 =cut