don't run connection actions if ->_rebless does not connect
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16
17 __PACKAGE__->mk_group_accessors('simple' =>
18   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
19      _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
26   disable_sth_caching unsafe auto_savepoint
27 /;
28 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
29
30
31 # default cursor class, overridable in connect_info attributes
32 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
33
34 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
35 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
36
37
38 =head1 NAME
39
40 DBIx::Class::Storage::DBI - DBI storage handler
41
42 =head1 SYNOPSIS
43
44   my $schema = MySchema->connect('dbi:SQLite:my.db');
45
46   $schema->storage->debug(1);
47   $schema->dbh_do("DROP TABLE authors");
48
49   $schema->resultset('Book')->search({
50      written_on => $schema->storage->datetime_parser(DateTime->now)
51   });
52
53 =head1 DESCRIPTION
54
55 This class represents the connection to an RDBMS via L<DBI>.  See
56 L<DBIx::Class::Storage> for general information.  This pod only
57 documents DBI-specific methods and behaviors.
58
59 =head1 METHODS
60
61 =cut
62
63 sub new {
64   my $new = shift->next::method(@_);
65
66   $new->transaction_depth(0);
67   $new->_sql_maker_opts({});
68   $new->{savepoints} = [];
69   $new->{_in_dbh_do} = 0;
70   $new->{_dbh_gen} = 0;
71
72   $new;
73 }
74
75 =head2 connect_info
76
77 This method is normally called by L<DBIx::Class::Schema/connection>, which
78 encapsulates its argument list in an arrayref before passing them here.
79
80 The argument list may contain:
81
82 =over
83
84 =item *
85
86 The same 4-element argument set one would normally pass to
87 L<DBI/connect>, optionally followed by
88 L<extra attributes|/DBIx::Class specific connection attributes>
89 recognized by DBIx::Class:
90
91   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
92
93 =item *
94
95 A single code reference which returns a connected
96 L<DBI database handle|DBI/connect> optionally followed by
97 L<extra attributes|/DBIx::Class specific connection attributes> recognized
98 by DBIx::Class:
99
100   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
101
102 =item *
103
104 A single hashref with all the attributes and the dsn/user/password
105 mixed together:
106
107   $connect_info_args = [{
108     dsn => $dsn,
109     user => $user,
110     password => $pass,
111     %dbi_attributes,
112     %extra_attributes,
113   }];
114
115 This is particularly useful for L<Catalyst> based applications, allowing the
116 following config (L<Config::General> style):
117
118   <Model::DB>
119     schema_class   App::DB
120     <connect_info>
121       dsn          dbi:mysql:database=test
122       user         testuser
123       password     TestPass
124       AutoCommit   1
125     </connect_info>
126   </Model::DB>
127
128 =back
129
130 Please note that the L<DBI> docs recommend that you always explicitly
131 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
132 recommends that it be set to I<1>, and that you perform transactions
133 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
134 to I<1> if you do not do explicitly set it to zero.  This is the default
135 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
136
137 =head3 DBIx::Class specific connection attributes
138
139 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
140 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
141 the following connection options. These options can be mixed in with your other
142 L<DBI> connection attributes, or placed in a seperate hashref
143 (C<\%extra_attributes>) as shown above.
144
145 Every time C<connect_info> is invoked, any previous settings for
146 these options will be cleared before setting the new ones, regardless of
147 whether any options are specified in the new C<connect_info>.
148
149
150 =over
151
152 =item on_connect_do
153
154 Specifies things to do immediately after connecting or re-connecting to
155 the database.  Its value may contain:
156
157 =over
158
159 =item a scalar
160
161 This contains one SQL statement to execute.
162
163 =item an array reference
164
165 This contains SQL statements to execute in order.  Each element contains
166 a string or a code reference that returns a string.
167
168 =item a code reference
169
170 This contains some code to execute.  Unlike code references within an
171 array reference, its return value is ignored.
172
173 =back
174
175 =item on_disconnect_do
176
177 Takes arguments in the same form as L</on_connect_do> and executes them
178 immediately before disconnecting from the database.
179
180 Note, this only runs if you explicitly call L</disconnect> on the
181 storage object.
182
183 =item on_connect_call
184
185 A more generalized form of L</on_connect_do> that calls the specified
186 C<connect_call_METHOD> methods in your storage driver.
187
188   on_connect_do => 'select 1'
189
190 is equivalent to:
191
192   on_connect_call => [ [ do_sql => 'select 1' ] ]
193
194 Its values may contain:
195
196 =over
197
198 =item a scalar
199
200 Will call the C<connect_call_METHOD> method.
201
202 =item a code reference
203
204 Will execute C<< $code->($storage) >>
205
206 =item an array reference
207
208 Each value can be a method name or code reference.
209
210 =item an array of arrays
211
212 For each array, the first item is taken to be the C<connect_call_> method name
213 or code reference, and the rest are parameters to it.
214
215 =back
216
217 Some predefined storage methods you may use:
218
219 =over
220
221 =item do_sql
222
223 Executes a SQL string or a code reference that returns a SQL string. This is
224 what L</on_connect_do> and L</on_disconnect_do> use.
225
226 It can take:
227
228 =over
229
230 =item a scalar
231
232 Will execute the scalar as SQL.
233
234 =item an arrayref
235
236 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
237 attributes hashref and bind values.
238
239 =item a code reference
240
241 Will execute C<< $code->($storage) >> and execute the return array refs as
242 above.
243
244 =back
245
246 =item datetime_setup
247
248 Execute any statements necessary to initialize the database session to return
249 and accept datetime/timestamp values used with
250 L<DBIx::Class::InflateColumn::DateTime>.
251
252 Only necessary for some databases, see your specific storage driver for
253 implementation details.
254
255 =back
256
257 =item on_disconnect_call
258
259 Takes arguments in the same form as L</on_connect_call> and executes them
260 immediately before disconnecting from the database.
261
262 Calls the C<disconnect_call_METHOD> methods as opposed to the
263 C<connect_call_METHOD> methods called by L</on_connect_call>.
264
265 Note, this only runs if you explicitly call L</disconnect> on the
266 storage object.
267
268 =item disable_sth_caching
269
270 If set to a true value, this option will disable the caching of
271 statement handles via L<DBI/prepare_cached>.
272
273 =item limit_dialect
274
275 Sets the limit dialect. This is useful for JDBC-bridge among others
276 where the remote SQL-dialect cannot be determined by the name of the
277 driver alone. See also L<SQL::Abstract::Limit>.
278
279 =item quote_char
280
281 Specifies what characters to use to quote table and column names. If
282 you use this you will want to specify L</name_sep> as well.
283
284 C<quote_char> expects either a single character, in which case is it
285 is placed on either side of the table/column name, or an arrayref of length
286 2 in which case the table/column name is placed between the elements.
287
288 For example under MySQL you should use C<< quote_char => '`' >>, and for
289 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
290
291 =item name_sep
292
293 This only needs to be used in conjunction with C<quote_char>, and is used to
294 specify the charecter that seperates elements (schemas, tables, columns) from
295 each other. In most cases this is simply a C<.>.
296
297 The consequences of not supplying this value is that L<SQL::Abstract>
298 will assume DBIx::Class' uses of aliases to be complete column
299 names. The output will look like I<"me.name"> when it should actually
300 be I<"me"."name">.
301
302 =item unsafe
303
304 This Storage driver normally installs its own C<HandleError>, sets
305 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
306 all database handles, including those supplied by a coderef.  It does this
307 so that it can have consistent and useful error behavior.
308
309 If you set this option to a true value, Storage will not do its usual
310 modifications to the database handle's attributes, and instead relies on
311 the settings in your connect_info DBI options (or the values you set in
312 your connection coderef, in the case that you are connecting via coderef).
313
314 Note that your custom settings can cause Storage to malfunction,
315 especially if you set a C<HandleError> handler that suppresses exceptions
316 and/or disable C<RaiseError>.
317
318 =item auto_savepoint
319
320 If this option is true, L<DBIx::Class> will use savepoints when nesting
321 transactions, making it possible to recover from failure in the inner
322 transaction without having to abort all outer transactions.
323
324 =item cursor_class
325
326 Use this argument to supply a cursor class other than the default
327 L<DBIx::Class::Storage::DBI::Cursor>.
328
329 =back
330
331 Some real-life examples of arguments to L</connect_info> and
332 L<DBIx::Class::Schema/connect>
333
334   # Simple SQLite connection
335   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
336
337   # Connect via subref
338   ->connect_info([ sub { DBI->connect(...) } ]);
339
340   # A bit more complicated
341   ->connect_info(
342     [
343       'dbi:Pg:dbname=foo',
344       'postgres',
345       'my_pg_password',
346       { AutoCommit => 1 },
347       { quote_char => q{"}, name_sep => q{.} },
348     ]
349   );
350
351   # Equivalent to the previous example
352   ->connect_info(
353     [
354       'dbi:Pg:dbname=foo',
355       'postgres',
356       'my_pg_password',
357       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
358     ]
359   );
360
361   # Same, but with hashref as argument
362   # See parse_connect_info for explanation
363   ->connect_info(
364     [{
365       dsn         => 'dbi:Pg:dbname=foo',
366       user        => 'postgres',
367       password    => 'my_pg_password',
368       AutoCommit  => 1,
369       quote_char  => q{"},
370       name_sep    => q{.},
371     }]
372   );
373
374   # Subref + DBIx::Class-specific connection options
375   ->connect_info(
376     [
377       sub { DBI->connect(...) },
378       {
379           quote_char => q{`},
380           name_sep => q{@},
381           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
382           disable_sth_caching => 1,
383       },
384     ]
385   );
386
387
388
389 =cut
390
391 sub connect_info {
392   my ($self, $info_arg) = @_;
393
394   return $self->_connect_info if !$info_arg;
395
396   my @args = @$info_arg;  # take a shallow copy for further mutilation
397   $self->_connect_info([@args]); # copy for _connect_info
398
399
400   # combine/pre-parse arguments depending on invocation style
401
402   my %attrs;
403   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
404     %attrs = %{ $args[1] || {} };
405     @args = $args[0];
406   }
407   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
408     %attrs = %{$args[0]};
409     @args = ();
410     for (qw/password user dsn/) {
411       unshift @args, delete $attrs{$_};
412     }
413   }
414   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
415     %attrs = (
416       % { $args[3] || {} },
417       % { $args[4] || {} },
418     );
419     @args = @args[0,1,2];
420   }
421
422   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
423   #  the new set of options
424   $self->_sql_maker(undef);
425   $self->_sql_maker_opts({});
426
427   if(keys %attrs) {
428     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
429       if(my $value = delete $attrs{$storage_opt}) {
430         $self->$storage_opt($value);
431       }
432     }
433     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
434       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
435         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
436       }
437     }
438   }
439
440   if (ref $args[0] eq 'CODE') {
441     # _connect() never looks past $args[0] in this case
442     %attrs = ()
443   } else {
444     %attrs = (%{ $self->_dbi_connect_attributes }, %attrs);
445   }
446
447   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
448   $self->_connect_info;
449 }
450
451 sub _dbi_connect_attributes {
452   return { AutoCommit => 1 };
453 }
454
455 =head2 on_connect_do
456
457 This method is deprecated in favour of setting via L</connect_info>.
458
459 =cut
460
461 =head2 on_disconnect_do
462
463 This method is deprecated in favour of setting via L</connect_info>.
464
465 =cut
466
467 sub _parse_connect_do {
468   my ($self, $type) = @_;
469
470   my $val = $self->$type;
471   return () if not defined $val;
472
473   my @res;
474
475   if (not ref($val)) {
476     push @res, [ 'do_sql', $val ];
477   } elsif (ref($val) eq 'CODE') {
478     push @res, $val;
479   } elsif (ref($val) eq 'ARRAY') {
480     push @res, map { [ 'do_sql', $_ ] } @$val;
481   } else {
482     $self->throw_exception("Invalid type for $type: ".ref($val));
483   }
484
485   return \@res;
486 }
487
488 =head2 dbh_do
489
490 Arguments: ($subref | $method_name), @extra_coderef_args?
491
492 Execute the given $subref or $method_name using the new exception-based
493 connection management.
494
495 The first two arguments will be the storage object that C<dbh_do> was called
496 on and a database handle to use.  Any additional arguments will be passed
497 verbatim to the called subref as arguments 2 and onwards.
498
499 Using this (instead of $self->_dbh or $self->dbh) ensures correct
500 exception handling and reconnection (or failover in future subclasses).
501
502 Your subref should have no side-effects outside of the database, as
503 there is the potential for your subref to be partially double-executed
504 if the database connection was stale/dysfunctional.
505
506 Example:
507
508   my @stuff = $schema->storage->dbh_do(
509     sub {
510       my ($storage, $dbh, @cols) = @_;
511       my $cols = join(q{, }, @cols);
512       $dbh->selectrow_array("SELECT $cols FROM foo");
513     },
514     @column_list
515   );
516
517 =cut
518
519 sub dbh_do {
520   my $self = shift;
521   my $code = shift;
522
523   my $dbh = $self->_dbh;
524
525   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
526       || $self->{transaction_depth};
527
528   local $self->{_in_dbh_do} = 1;
529
530   my @result;
531   my $want_array = wantarray;
532
533   eval {
534     $self->_verify_pid if $dbh;
535     if(!$self->_dbh) {
536         $self->_populate_dbh;
537         $dbh = $self->_dbh;
538     }
539
540     if($want_array) {
541         @result = $self->$code($dbh, @_);
542     }
543     elsif(defined $want_array) {
544         $result[0] = $self->$code($dbh, @_);
545     }
546     else {
547         $self->$code($dbh, @_);
548     }
549   };
550
551   my $exception = $@;
552   if(!$exception) { return $want_array ? @result : $result[0] }
553
554   $self->throw_exception($exception) if $self->connected;
555
556   # We were not connected - reconnect and retry, but let any
557   #  exception fall right through this time
558   $self->_populate_dbh;
559   $self->$code($self->_dbh, @_);
560 }
561
562 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
563 # It also informs dbh_do to bypass itself while under the direction of txn_do,
564 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
565 sub txn_do {
566   my $self = shift;
567   my $coderef = shift;
568
569   ref $coderef eq 'CODE' or $self->throw_exception
570     ('$coderef must be a CODE reference');
571
572   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
573
574   local $self->{_in_dbh_do} = 1;
575
576   my @result;
577   my $want_array = wantarray;
578
579   my $tried = 0;
580   while(1) {
581     eval {
582       $self->_verify_pid if $self->_dbh;
583       $self->_populate_dbh if !$self->_dbh;
584
585       $self->txn_begin;
586       if($want_array) {
587           @result = $coderef->(@_);
588       }
589       elsif(defined $want_array) {
590           $result[0] = $coderef->(@_);
591       }
592       else {
593           $coderef->(@_);
594       }
595       $self->txn_commit;
596     };
597
598     my $exception = $@;
599     if(!$exception) { return $want_array ? @result : $result[0] }
600
601     if($tried++ > 0 || $self->connected) {
602       eval { $self->txn_rollback };
603       my $rollback_exception = $@;
604       if($rollback_exception) {
605         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
606         $self->throw_exception($exception)  # propagate nested rollback
607           if $rollback_exception =~ /$exception_class/;
608
609         $self->throw_exception(
610           "Transaction aborted: ${exception}. "
611           . "Rollback failed: ${rollback_exception}"
612         );
613       }
614       $self->throw_exception($exception)
615     }
616
617     # We were not connected, and was first try - reconnect and retry
618     # via the while loop
619     $self->_populate_dbh;
620   }
621 }
622
623 =head2 disconnect
624
625 Our C<disconnect> method also performs a rollback first if the
626 database is not in C<AutoCommit> mode.
627
628 =cut
629
630 sub disconnect {
631   my ($self) = @_;
632
633   if( $self->_dbh ) {
634     my @actions;
635
636     push @actions, ( $self->on_disconnect_call || () );
637     push @actions, $self->_parse_connect_do ('on_disconnect_do');
638
639     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
640
641     $self->_dbh->rollback unless $self->_dbh_autocommit;
642     $self->_dbh->disconnect;
643     $self->_dbh(undef);
644     $self->{_dbh_gen}++;
645   }
646 }
647
648 =head2 with_deferred_fk_checks
649
650 =over 4
651
652 =item Arguments: C<$coderef>
653
654 =item Return Value: The return value of $coderef
655
656 =back
657
658 Storage specific method to run the code ref with FK checks deferred or
659 in MySQL's case disabled entirely.
660
661 =cut
662
663 # Storage subclasses should override this
664 sub with_deferred_fk_checks {
665   my ($self, $sub) = @_;
666
667   $sub->();
668 }
669
670 sub connected {
671   my ($self) = @_;
672
673   if(my $dbh = $self->_dbh) {
674       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
675           $self->_dbh(undef);
676           $self->{_dbh_gen}++;
677           return;
678       }
679       else {
680           $self->_verify_pid;
681           return 0 if !$self->_dbh;
682       }
683       return ($dbh->FETCH('Active') && $self->_ping);
684   }
685
686   return 0;
687 }
688
689 sub _ping {
690   my $self = shift;
691
692   my $dbh = $self->_dbh or return 0;
693
694   return $dbh->ping;
695 }
696
697 # handle pid changes correctly
698 #  NOTE: assumes $self->_dbh is a valid $dbh
699 sub _verify_pid {
700   my ($self) = @_;
701
702   return if defined $self->_conn_pid && $self->_conn_pid == $$;
703
704   $self->_dbh->{InactiveDestroy} = 1;
705   $self->_dbh(undef);
706   $self->{_dbh_gen}++;
707
708   return;
709 }
710
711 sub ensure_connected {
712   my ($self) = @_;
713
714   unless ($self->connected) {
715     $self->_populate_dbh;
716   }
717 }
718
719 =head2 dbh
720
721 Returns the dbh - a data base handle of class L<DBI>.
722
723 =cut
724
725 sub dbh {
726   my ($self) = @_;
727
728   if (not $self->_dbh) {
729     $self->_populate_dbh;
730   } else {
731     $self->ensure_connected;
732   }
733   return $self->_dbh;
734 }
735
736 sub _get_dbh {
737   my $self = shift;
738
739   if (not $self->_dbh) {
740     $self->_populate_dbh;
741   }
742   return $self->_dbh;
743 }
744
745 sub _sql_maker_args {
746     my ($self) = @_;
747
748     return (
749       bindtype=>'columns',
750       array_datatypes => 1,
751       limit_dialect => $self->_get_dbh,
752       %{$self->_sql_maker_opts}
753     );
754 }
755
756 sub sql_maker {
757   my ($self) = @_;
758   unless ($self->_sql_maker) {
759     my $sql_maker_class = $self->sql_maker_class;
760     $self->ensure_class_loaded ($sql_maker_class);
761     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
762   }
763   return $self->_sql_maker;
764 }
765
766 sub _rebless {}
767
768 sub _populate_dbh {
769   my ($self) = @_;
770
771   my @info = @{$self->_dbi_connect_info || []};
772   $self->_dbh($self->_connect(@info));
773
774   $self->_conn_pid($$);
775   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
776
777   $self->_determine_driver;
778
779   # Always set the transaction depth on connect, since
780   #  there is no transaction in progress by definition
781   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
782
783   $self->_run_connection_actions unless $self->{_in_determine_driver};
784 }
785
786 sub _run_connection_actions {
787   my $self = shift;
788   my @actions;
789
790   push @actions, ( $self->on_connect_call || () );
791   push @actions, $self->_parse_connect_do ('on_connect_do');
792
793   $self->_do_connection_actions(connect_call_ => $_) for @actions;
794 }
795
796 sub _determine_driver {
797   my ($self) = @_;
798
799   if (ref $self eq 'DBIx::Class::Storage::DBI') {
800     my $driver;
801     my $started_unconnected = 0;
802     local $self->{_in_determine_driver} = 1;
803
804     if ($self->_dbh) { # we are connected
805       $driver = $self->_dbh->{Driver}{Name};
806     } else {
807       # try to use dsn to not require being connected, the driver may still
808       # force a connection in _rebless to determine version
809       ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
810       $started_unconnected = 1;
811     }
812
813     my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
814     if ($self->load_optional_class($storage_class)) {
815       mro::set_mro($storage_class, 'c3');
816       bless $self, $storage_class;
817       $self->_rebless();
818     }
819
820     $self->_run_connection_actions
821         if $started_unconnected && defined $self->_dbh;
822   }
823 }
824
825 sub _do_connection_actions {
826   my $self          = shift;
827   my $method_prefix = shift;
828   my $call          = shift;
829
830   if (not ref($call)) {
831     my $method = $method_prefix . $call;
832     $self->$method(@_);
833   } elsif (ref($call) eq 'CODE') {
834     $self->$call(@_);
835   } elsif (ref($call) eq 'ARRAY') {
836     if (ref($call->[0]) ne 'ARRAY') {
837       $self->_do_connection_actions($method_prefix, $_) for @$call;
838     } else {
839       $self->_do_connection_actions($method_prefix, @$_) for @$call;
840     }
841   } else {
842     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
843   }
844
845   return $self;
846 }
847
848 sub connect_call_do_sql {
849   my $self = shift;
850   $self->_do_query(@_);
851 }
852
853 sub disconnect_call_do_sql {
854   my $self = shift;
855   $self->_do_query(@_);
856 }
857
858 # override in db-specific backend when necessary
859 sub connect_call_datetime_setup { 1 }
860
861 sub _do_query {
862   my ($self, $action) = @_;
863
864   if (ref $action eq 'CODE') {
865     $action = $action->($self);
866     $self->_do_query($_) foreach @$action;
867   }
868   else {
869     # Most debuggers expect ($sql, @bind), so we need to exclude
870     # the attribute hash which is the second argument to $dbh->do
871     # furthermore the bind values are usually to be presented
872     # as named arrayref pairs, so wrap those here too
873     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
874     my $sql = shift @do_args;
875     my $attrs = shift @do_args;
876     my @bind = map { [ undef, $_ ] } @do_args;
877
878     $self->_query_start($sql, @bind);
879     $self->_dbh->do($sql, $attrs, @do_args);
880     $self->_query_end($sql, @bind);
881   }
882
883   return $self;
884 }
885
886 sub _connect {
887   my ($self, @info) = @_;
888
889   $self->throw_exception("You failed to provide any connection info")
890     if !@info;
891
892   my ($old_connect_via, $dbh);
893
894   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
895     $old_connect_via = $DBI::connect_via;
896     $DBI::connect_via = 'connect';
897   }
898
899   eval {
900     if(ref $info[0] eq 'CODE') {
901        $dbh = &{$info[0]}
902     }
903     else {
904        $dbh = DBI->connect(@info);
905     }
906
907     if($dbh && !$self->unsafe) {
908       my $weak_self = $self;
909       Scalar::Util::weaken($weak_self);
910       $dbh->{HandleError} = sub {
911           if ($weak_self) {
912             $weak_self->throw_exception("DBI Exception: $_[0]");
913           }
914           else {
915             croak ("DBI Exception: $_[0]");
916           }
917       };
918       $dbh->{ShowErrorStatement} = 1;
919       $dbh->{RaiseError} = 1;
920       $dbh->{PrintError} = 0;
921     }
922   };
923
924   $DBI::connect_via = $old_connect_via if $old_connect_via;
925
926   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
927     if !$dbh || $@;
928
929   $self->_dbh_autocommit($dbh->{AutoCommit});
930
931   $dbh;
932 }
933
934 sub svp_begin {
935   my ($self, $name) = @_;
936
937   $name = $self->_svp_generate_name
938     unless defined $name;
939
940   $self->throw_exception ("You can't use savepoints outside a transaction")
941     if $self->{transaction_depth} == 0;
942
943   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
944     unless $self->can('_svp_begin');
945
946   push @{ $self->{savepoints} }, $name;
947
948   $self->debugobj->svp_begin($name) if $self->debug;
949
950   return $self->_svp_begin($name);
951 }
952
953 sub svp_release {
954   my ($self, $name) = @_;
955
956   $self->throw_exception ("You can't use savepoints outside a transaction")
957     if $self->{transaction_depth} == 0;
958
959   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
960     unless $self->can('_svp_release');
961
962   if (defined $name) {
963     $self->throw_exception ("Savepoint '$name' does not exist")
964       unless grep { $_ eq $name } @{ $self->{savepoints} };
965
966     # Dig through the stack until we find the one we are releasing.  This keeps
967     # the stack up to date.
968     my $svp;
969
970     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
971   } else {
972     $name = pop @{ $self->{savepoints} };
973   }
974
975   $self->debugobj->svp_release($name) if $self->debug;
976
977   return $self->_svp_release($name);
978 }
979
980 sub svp_rollback {
981   my ($self, $name) = @_;
982
983   $self->throw_exception ("You can't use savepoints outside a transaction")
984     if $self->{transaction_depth} == 0;
985
986   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
987     unless $self->can('_svp_rollback');
988
989   if (defined $name) {
990       # If they passed us a name, verify that it exists in the stack
991       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
992           $self->throw_exception("Savepoint '$name' does not exist!");
993       }
994
995       # Dig through the stack until we find the one we are releasing.  This keeps
996       # the stack up to date.
997       while(my $s = pop(@{ $self->{savepoints} })) {
998           last if($s eq $name);
999       }
1000       # Add the savepoint back to the stack, as a rollback doesn't remove the
1001       # named savepoint, only everything after it.
1002       push(@{ $self->{savepoints} }, $name);
1003   } else {
1004       # We'll assume they want to rollback to the last savepoint
1005       $name = $self->{savepoints}->[-1];
1006   }
1007
1008   $self->debugobj->svp_rollback($name) if $self->debug;
1009
1010   return $self->_svp_rollback($name);
1011 }
1012
1013 sub _svp_generate_name {
1014     my ($self) = @_;
1015
1016     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1017 }
1018
1019 sub txn_begin {
1020   my $self = shift;
1021   if($self->{transaction_depth} == 0) {
1022     $self->debugobj->txn_begin()
1023       if $self->debug;
1024     # this isn't ->_dbh-> because
1025     #  we should reconnect on begin_work
1026     #  for AutoCommit users
1027     $self->dbh_do(sub { $_[1]->begin_work });
1028   } elsif ($self->auto_savepoint) {
1029     $self->svp_begin;
1030   }
1031   $self->{transaction_depth}++;
1032 }
1033
1034 sub txn_commit {
1035   my $self = shift;
1036   if ($self->{transaction_depth} == 1) {
1037     my $dbh = $self->_dbh;
1038     $self->debugobj->txn_commit()
1039       if ($self->debug);
1040     $dbh->commit;
1041     $self->{transaction_depth} = 0
1042       if $self->_dbh_autocommit;
1043   }
1044   elsif($self->{transaction_depth} > 1) {
1045     $self->{transaction_depth}--;
1046     $self->svp_release
1047       if $self->auto_savepoint;
1048   }
1049 }
1050
1051 sub txn_rollback {
1052   my $self = shift;
1053   my $dbh = $self->_dbh;
1054   eval {
1055     if ($self->{transaction_depth} == 1) {
1056       $self->debugobj->txn_rollback()
1057         if ($self->debug);
1058       $self->{transaction_depth} = 0
1059         if $self->_dbh_autocommit;
1060       $dbh->rollback;
1061     }
1062     elsif($self->{transaction_depth} > 1) {
1063       $self->{transaction_depth}--;
1064       if ($self->auto_savepoint) {
1065         $self->svp_rollback;
1066         $self->svp_release;
1067       }
1068     }
1069     else {
1070       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1071     }
1072   };
1073   if ($@) {
1074     my $error = $@;
1075     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1076     $error =~ /$exception_class/ and $self->throw_exception($error);
1077     # ensure that a failed rollback resets the transaction depth
1078     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1079     $self->throw_exception($error);
1080   }
1081 }
1082
1083 # This used to be the top-half of _execute.  It was split out to make it
1084 #  easier to override in NoBindVars without duping the rest.  It takes up
1085 #  all of _execute's args, and emits $sql, @bind.
1086 sub _prep_for_execute {
1087   my ($self, $op, $extra_bind, $ident, $args) = @_;
1088
1089   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1090     $ident = $ident->from();
1091   }
1092
1093   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1094
1095   unshift(@bind,
1096     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1097       if $extra_bind;
1098   return ($sql, \@bind);
1099 }
1100
1101
1102 sub _fix_bind_params {
1103     my ($self, @bind) = @_;
1104
1105     ### Turn @bind from something like this:
1106     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1107     ### to this:
1108     ###   ( "'1'", "'1'", "'3'" )
1109     return
1110         map {
1111             if ( defined( $_ && $_->[1] ) ) {
1112                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1113             }
1114             else { q{'NULL'}; }
1115         } @bind;
1116 }
1117
1118 sub _query_start {
1119     my ( $self, $sql, @bind ) = @_;
1120
1121     if ( $self->debug ) {
1122         @bind = $self->_fix_bind_params(@bind);
1123
1124         $self->debugobj->query_start( $sql, @bind );
1125     }
1126 }
1127
1128 sub _query_end {
1129     my ( $self, $sql, @bind ) = @_;
1130
1131     if ( $self->debug ) {
1132         @bind = $self->_fix_bind_params(@bind);
1133         $self->debugobj->query_end( $sql, @bind );
1134     }
1135 }
1136
1137 sub _dbh_execute {
1138   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1139
1140   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1141
1142   $self->_query_start( $sql, @$bind );
1143
1144   my $sth = $self->sth($sql,$op);
1145
1146   my $placeholder_index = 1;
1147
1148   foreach my $bound (@$bind) {
1149     my $attributes = {};
1150     my($column_name, @data) = @$bound;
1151
1152     if ($bind_attributes) {
1153       $attributes = $bind_attributes->{$column_name}
1154       if defined $bind_attributes->{$column_name};
1155     }
1156
1157     foreach my $data (@data) {
1158       my $ref = ref $data;
1159       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1160
1161       $sth->bind_param($placeholder_index, $data, $attributes);
1162       $placeholder_index++;
1163     }
1164   }
1165
1166   # Can this fail without throwing an exception anyways???
1167   my $rv = $sth->execute();
1168   $self->throw_exception($sth->errstr) if !$rv;
1169
1170   $self->_query_end( $sql, @$bind );
1171
1172   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1173 }
1174
1175 sub _execute {
1176     my $self = shift;
1177     $self->dbh_do('_dbh_execute', @_)
1178 }
1179
1180 sub insert {
1181   my ($self, $source, $to_insert) = @_;
1182
1183   $self->_determine_driver;
1184
1185   my $ident = $source->from;
1186   my $bind_attributes = $self->source_bind_attributes($source);
1187
1188   my $updated_cols = {};
1189
1190   foreach my $col ( $source->columns ) {
1191     if ( !defined $to_insert->{$col} ) {
1192       my $col_info = $source->column_info($col);
1193
1194       if ( $col_info->{auto_nextval} ) {
1195         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1196           'nextval',
1197           $col_info->{sequence} ||
1198             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1199         );
1200       }
1201     }
1202   }
1203
1204   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1205
1206   return $updated_cols;
1207 }
1208
1209 ## Still not quite perfect, and EXPERIMENTAL
1210 ## Currently it is assumed that all values passed will be "normal", i.e. not
1211 ## scalar refs, or at least, all the same type as the first set, the statement is
1212 ## only prepped once.
1213 sub insert_bulk {
1214   my ($self, $source, $cols, $data) = @_;
1215   my %colvalues;
1216   my $table = $source->from;
1217   @colvalues{@$cols} = (0..$#$cols);
1218   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1219
1220   $self->_determine_driver;
1221
1222   $self->_query_start( $sql, @bind );
1223   my $sth = $self->sth($sql);
1224
1225 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1226
1227   ## This must be an arrayref, else nothing works!
1228   my $tuple_status = [];
1229
1230   ## Get the bind_attributes, if any exist
1231   my $bind_attributes = $self->source_bind_attributes($source);
1232
1233   ## Bind the values and execute
1234   my $placeholder_index = 1;
1235
1236   foreach my $bound (@bind) {
1237
1238     my $attributes = {};
1239     my ($column_name, $data_index) = @$bound;
1240
1241     if( $bind_attributes ) {
1242       $attributes = $bind_attributes->{$column_name}
1243       if defined $bind_attributes->{$column_name};
1244     }
1245
1246     my @data = map { $_->[$data_index] } @$data;
1247
1248     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1249     $placeholder_index++;
1250   }
1251   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1252   if (my $err = $@) {
1253     my $i = 0;
1254     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1255
1256     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1257       if ($i > $#$tuple_status);
1258
1259     require Data::Dumper;
1260     local $Data::Dumper::Terse = 1;
1261     local $Data::Dumper::Indent = 1;
1262     local $Data::Dumper::Useqq = 1;
1263     local $Data::Dumper::Quotekeys = 0;
1264
1265     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1266       $tuple_status->[$i][1],
1267       Data::Dumper::Dumper(
1268         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1269       ),
1270     );
1271   }
1272   $self->throw_exception($sth->errstr) if !$rv;
1273
1274   $self->_query_end( $sql, @bind );
1275   return (wantarray ? ($rv, $sth, @bind) : $rv);
1276 }
1277
1278 sub update {
1279   my $self = shift @_;
1280   my $source = shift @_;
1281   $self->_determine_driver;
1282   my $bind_attributes = $self->source_bind_attributes($source);
1283
1284   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1285 }
1286
1287
1288 sub delete {
1289   my $self = shift @_;
1290   my $source = shift @_;
1291   $self->_determine_driver;
1292   my $bind_attrs = $self->source_bind_attributes($source);
1293
1294   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1295 }
1296
1297 # We were sent here because the $rs contains a complex search
1298 # which will require a subquery to select the correct rows
1299 # (i.e. joined or limited resultsets)
1300 #
1301 # Genarating a single PK column subquery is trivial and supported
1302 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1303 # Look at _multipk_update_delete()
1304 sub _subq_update_delete {
1305   my $self = shift;
1306   my ($rs, $op, $values) = @_;
1307
1308   my $rsrc = $rs->result_source;
1309
1310   # we already check this, but double check naively just in case. Should be removed soon
1311   my $sel = $rs->_resolved_attrs->{select};
1312   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1313   my @pcols = $rsrc->primary_columns;
1314   if (@$sel != @pcols) {
1315     $self->throw_exception (
1316       'Subquery update/delete can not be called on resultsets selecting a'
1317      .' number of columns different than the number of primary keys'
1318     );
1319   }
1320
1321   if (@pcols == 1) {
1322     return $self->$op (
1323       $rsrc,
1324       $op eq 'update' ? $values : (),
1325       { $pcols[0] => { -in => $rs->as_query } },
1326     );
1327   }
1328
1329   else {
1330     return $self->_multipk_update_delete (@_);
1331   }
1332 }
1333
1334 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1335 # resultset update/delete involving subqueries. So by default resort
1336 # to simple (and inefficient) delete_all style per-row opearations,
1337 # while allowing specific storages to override this with a faster
1338 # implementation.
1339 #
1340 sub _multipk_update_delete {
1341   return shift->_per_row_update_delete (@_);
1342 }
1343
1344 # This is the default loop used to delete/update rows for multi PK
1345 # resultsets, and used by mysql exclusively (because it can't do anything
1346 # else).
1347 #
1348 # We do not use $row->$op style queries, because resultset update/delete
1349 # is not expected to cascade (this is what delete_all/update_all is for).
1350 #
1351 # There should be no race conditions as the entire operation is rolled
1352 # in a transaction.
1353 #
1354 sub _per_row_update_delete {
1355   my $self = shift;
1356   my ($rs, $op, $values) = @_;
1357
1358   my $rsrc = $rs->result_source;
1359   my @pcols = $rsrc->primary_columns;
1360
1361   my $guard = $self->txn_scope_guard;
1362
1363   # emulate the return value of $sth->execute for non-selects
1364   my $row_cnt = '0E0';
1365
1366   my $subrs_cur = $rs->cursor;
1367   while (my @pks = $subrs_cur->next) {
1368
1369     my $cond;
1370     for my $i (0.. $#pcols) {
1371       $cond->{$pcols[$i]} = $pks[$i];
1372     }
1373
1374     $self->$op (
1375       $rsrc,
1376       $op eq 'update' ? $values : (),
1377       $cond,
1378     );
1379
1380     $row_cnt++;
1381   }
1382
1383   $guard->commit;
1384
1385   return $row_cnt;
1386 }
1387
1388 sub _select {
1389   my $self = shift;
1390
1391   # localization is neccessary as
1392   # 1) there is no infrastructure to pass this around before SQLA2
1393   # 2) _select_args sets it and _prep_for_execute consumes it
1394   my $sql_maker = $self->sql_maker;
1395   local $sql_maker->{_dbic_rs_attrs};
1396
1397   return $self->_execute($self->_select_args(@_));
1398 }
1399
1400 sub _select_args_to_query {
1401   my $self = shift;
1402
1403   # localization is neccessary as
1404   # 1) there is no infrastructure to pass this around before SQLA2
1405   # 2) _select_args sets it and _prep_for_execute consumes it
1406   my $sql_maker = $self->sql_maker;
1407   local $sql_maker->{_dbic_rs_attrs};
1408
1409   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1410   #  = $self->_select_args($ident, $select, $cond, $attrs);
1411   my ($op, $bind, $ident, $bind_attrs, @args) =
1412     $self->_select_args(@_);
1413
1414   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1415   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1416   $prepared_bind ||= [];
1417
1418   return wantarray
1419     ? ($sql, $prepared_bind, $bind_attrs)
1420     : \[ "($sql)", @$prepared_bind ]
1421   ;
1422 }
1423
1424 sub _select_args {
1425   my ($self, $ident, $select, $where, $attrs) = @_;
1426
1427   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1428
1429   my $sql_maker = $self->sql_maker;
1430   $sql_maker->{_dbic_rs_attrs} = {
1431     %$attrs,
1432     select => $select,
1433     from => $ident,
1434     where => $where,
1435     $rs_alias
1436       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1437       : ()
1438     ,
1439   };
1440
1441   # calculate bind_attrs before possible $ident mangling
1442   my $bind_attrs = {};
1443   for my $alias (keys %$alias2source) {
1444     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1445     for my $col (keys %$bindtypes) {
1446
1447       my $fqcn = join ('.', $alias, $col);
1448       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1449
1450       # so that unqualified searches can be bound too
1451       $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq $rs_alias;
1452     }
1453   }
1454
1455   # adjust limits
1456   if (
1457     $attrs->{software_limit}
1458       ||
1459     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1460   ) {
1461     $attrs->{software_limit} = 1;
1462   }
1463   else {
1464     $self->throw_exception("rows attribute must be positive if present")
1465       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1466
1467     # MySQL actually recommends this approach.  I cringe.
1468     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1469   }
1470
1471   my @limit;
1472
1473   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1474   # otherwise delegate the limiting to the storage, unless software limit was requested
1475   if (
1476     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1477        ||
1478     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1479       $attrs->{prefetch_select} && @{$attrs->{prefetch_select}} )
1480   ) {
1481     ($ident, $select, $where, $attrs)
1482       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1483   }
1484   elsif (! $attrs->{software_limit} ) {
1485     push @limit, $attrs->{rows}, $attrs->{offset};
1486   }
1487
1488 ###
1489   # This would be the point to deflate anything found in $where
1490   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1491   # expect a row object. And all we have is a resultsource (it is trivial
1492   # to extract deflator coderefs via $alias2source above).
1493   #
1494   # I don't see a way forward other than changing the way deflators are
1495   # invoked, and that's just bad...
1496 ###
1497
1498   my $order = { map
1499     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1500     (qw/order_by group_by having/ )
1501   };
1502
1503   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1504 }
1505
1506 #
1507 # This is the code producing joined subqueries like:
1508 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1509 #
1510 sub _adjust_select_args_for_complex_prefetch {
1511   my ($self, $from, $select, $where, $attrs) = @_;
1512
1513   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1514     if (ref $from ne 'ARRAY');
1515
1516   # copies for mangling
1517   $from = [ @$from ];
1518   $select = [ @$select ];
1519   $attrs = { %$attrs };
1520
1521   # separate attributes
1522   my $sub_attrs = { %$attrs };
1523   delete $attrs->{$_} for qw/where bind rows offset group_by having/;
1524   delete $sub_attrs->{$_} for qw/for collapse prefetch_select _collapse_order_by select as/;
1525
1526   my $alias = $attrs->{alias};
1527   my $sql_maker = $self->sql_maker;
1528
1529   # create subquery select list - consider only stuff *not* brought in by the prefetch
1530   my $sub_select = [];
1531   for my $i (0 .. @{$attrs->{select}} - @{$attrs->{prefetch_select}} - 1) {
1532     my $sel = $attrs->{select}[$i];
1533
1534     # alias any functions to the dbic-side 'as' label
1535     # adjust the outer select accordingly
1536     if (ref $sel eq 'HASH' && !$sel->{-select}) {
1537       $sel = { -select => $sel, -as => $attrs->{as}[$i] };
1538       $select->[$i] = join ('.', $attrs->{alias}, ($attrs->{as}[$i] || "select_$i") );
1539     }
1540
1541     push @$sub_select, $sel;
1542   }
1543
1544   # bring over all non-collapse-induced order_by into the inner query (if any)
1545   # the outer one will have to keep them all
1546   delete $sub_attrs->{order_by};
1547   if (my $ord_cnt = @{$attrs->{order_by}} - @{$attrs->{_collapse_order_by}} ) {
1548     $sub_attrs->{order_by} = [
1549       @{$attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1550     ];
1551   }
1552
1553   # mangle {from}
1554   my $join_root = shift @$from;
1555   my @outer_from = @$from;
1556
1557   my %inner_joins;
1558   my %join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1559
1560   # in complex search_related chains $alias may *not* be 'me'
1561   # so always include it in the inner join, and also shift away
1562   # from the outer stack, so that the two datasets actually do
1563   # meet
1564   if ($join_root->{-alias} ne $alias) {
1565     $inner_joins{$alias} = 1;
1566
1567     while (@outer_from && $outer_from[0][0]{-alias} ne $alias) {
1568       shift @outer_from;
1569     }
1570     if (! @outer_from) {
1571       $self->throw_exception ("Unable to find '$alias' in the {from} stack, something is wrong");
1572     }
1573
1574     shift @outer_from; # the new subquery will represent this alias, so get rid of it
1575   }
1576
1577
1578   # decide which parts of the join will remain on the inside
1579   #
1580   # this is not a very viable optimisation, but it was written
1581   # before I realised this, so might as well remain. We can throw
1582   # away _any_ branches of the join tree that are:
1583   # 1) not mentioned in the condition/order
1584   # 2) left-join leaves (or left-join leaf chains)
1585   # Most of the join conditions will not satisfy this, but for real
1586   # complex queries some might, and we might make some RDBMS happy.
1587   #
1588   #
1589   # since we do not have introspectable SQLA, we fall back to ugly
1590   # scanning of raw SQL for WHERE, and for pieces of ORDER BY
1591   # in order to determine what goes into %inner_joins
1592   # It may not be very efficient, but it's a reasonable stop-gap
1593   {
1594     # produce stuff unquoted, so it can be scanned
1595     local $sql_maker->{quote_char};
1596     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1597     $sep = "\Q$sep\E";
1598
1599     my @order_by = (map
1600       { ref $_ ? $_->[0] : $_ }
1601       $sql_maker->_order_by_chunks ($sub_attrs->{order_by})
1602     );
1603
1604     my $where_sql = $sql_maker->where ($where);
1605     my $select_sql = $sql_maker->_recurse_fields ($sub_select);
1606
1607     # sort needed joins
1608     for my $alias (keys %join_info) {
1609
1610       # any table alias found on a column name in where or order_by
1611       # gets included in %inner_joins
1612       # Also any parent joins that are needed to reach this particular alias
1613       for my $piece ($select_sql, $where_sql, @order_by ) {
1614         if ($piece =~ /\b $alias $sep/x) {
1615           $inner_joins{$alias} = 1;
1616         }
1617       }
1618     }
1619   }
1620
1621   # scan for non-leaf/non-left joins and mark as needed
1622   # also mark all ancestor joins that are needed to reach this particular alias
1623   # (e.g.  join => { cds => 'tracks' } - tracks will bring cds too )
1624   #
1625   # traverse by the size of the -join_path i.e. reverse depth first
1626   for my $alias (sort { @{$join_info{$b}{-join_path}} <=> @{$join_info{$a}{-join_path}} } (keys %join_info) ) {
1627
1628     my $j = $join_info{$alias};
1629     $inner_joins{$alias} = 1 if (! $j->{-join_type} || ($j->{-join_type} !~ /^left$/i) );
1630
1631     if ($inner_joins{$alias}) {
1632       $inner_joins{$_} = 1 for (@{$j->{-join_path}});
1633     }
1634   }
1635
1636   # construct the inner $from for the subquery
1637   my $inner_from = [ $join_root ];
1638   for my $j (@$from) {
1639     push @$inner_from, $j if $inner_joins{$j->[0]{-alias}};
1640   }
1641
1642   # if a multi-type join was needed in the subquery ("multi" is indicated by
1643   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1644   for my $alias (keys %inner_joins) {
1645
1646     # the dot comes from some weirdness in collapse
1647     # remove after the rewrite
1648     if ($attrs->{collapse}{".$alias"}) {
1649       $sub_attrs->{group_by} ||= $sub_select;
1650       last;
1651     }
1652   }
1653
1654   # generate the subquery
1655   my $subq = $self->_select_args_to_query (
1656     $inner_from,
1657     $sub_select,
1658     $where,
1659     $sub_attrs
1660   );
1661
1662   # put it in the new {from}
1663   unshift @outer_from, {
1664     -alias => $alias,
1665     -source_handle => $join_root->{-source_handle},
1666     $alias => $subq,
1667   };
1668
1669   # This is totally horrific - the $where ends up in both the inner and outer query
1670   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
1671   # then if where conditions apply to the *right* side of the prefetch, you may have
1672   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
1673   # the outer select to exclude joins you didin't want in the first place
1674   #
1675   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1676   return (\@outer_from, $select, $where, $attrs);
1677 }
1678
1679 sub _resolve_ident_sources {
1680   my ($self, $ident) = @_;
1681
1682   my $alias2source = {};
1683   my $rs_alias;
1684
1685   # the reason this is so contrived is that $ident may be a {from}
1686   # structure, specifying multiple tables to join
1687   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1688     # this is compat mode for insert/update/delete which do not deal with aliases
1689     $alias2source->{me} = $ident;
1690     $rs_alias = 'me';
1691   }
1692   elsif (ref $ident eq 'ARRAY') {
1693
1694     for (@$ident) {
1695       my $tabinfo;
1696       if (ref $_ eq 'HASH') {
1697         $tabinfo = $_;
1698         $rs_alias = $tabinfo->{-alias};
1699       }
1700       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1701         $tabinfo = $_->[0];
1702       }
1703
1704       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1705         if ($tabinfo->{-source_handle});
1706     }
1707   }
1708
1709   return ($alias2source, $rs_alias);
1710 }
1711
1712 # Takes $ident, \@column_names
1713 #
1714 # returns { $column_name => \%column_info, ... }
1715 # also note: this adds -result_source => $rsrc to the column info
1716 #
1717 # usage:
1718 #   my $col_sources = $self->_resolve_column_info($ident, [map $_->[0], @{$bind}]);
1719 sub _resolve_column_info {
1720   my ($self, $ident, $colnames) = @_;
1721   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
1722
1723   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1724   $sep = "\Q$sep\E";
1725
1726   my (%return, %converted);
1727   foreach my $col (@$colnames) {
1728     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
1729
1730     # deal with unqualified cols - we assume the main alias for all
1731     # unqualified ones, ugly but can't think of anything better right now
1732     $alias ||= $root_alias;
1733
1734     my $rsrc = $alias2src->{$alias};
1735     $return{$col} = $rsrc && { %{$rsrc->column_info($colname)}, -result_source => $rsrc };
1736   }
1737   return \%return;
1738 }
1739
1740 # Returns a counting SELECT for a simple count
1741 # query. Abstracted so that a storage could override
1742 # this to { count => 'firstcol' } or whatever makes
1743 # sense as a performance optimization
1744 sub _count_select {
1745   #my ($self, $source, $rs_attrs) = @_;
1746   return { count => '*' };
1747 }
1748
1749 # Returns a SELECT which will end up in the subselect
1750 # There may or may not be a group_by, as the subquery
1751 # might have been called to accomodate a limit
1752 #
1753 # Most databases would be happy with whatever ends up
1754 # here, but some choke in various ways.
1755 #
1756 sub _subq_count_select {
1757   my ($self, $source, $rs_attrs) = @_;
1758   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1759
1760   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1761   return @pcols ? \@pcols : [ 1 ];
1762 }
1763
1764
1765 sub source_bind_attributes {
1766   my ($self, $source) = @_;
1767
1768   my $bind_attributes;
1769   foreach my $column ($source->columns) {
1770
1771     my $data_type = $source->column_info($column)->{data_type} || '';
1772     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1773      if $data_type;
1774   }
1775
1776   return $bind_attributes;
1777 }
1778
1779 =head2 select
1780
1781 =over 4
1782
1783 =item Arguments: $ident, $select, $condition, $attrs
1784
1785 =back
1786
1787 Handle a SQL select statement.
1788
1789 =cut
1790
1791 sub select {
1792   my $self = shift;
1793   my ($ident, $select, $condition, $attrs) = @_;
1794   return $self->cursor_class->new($self, \@_, $attrs);
1795 }
1796
1797 sub select_single {
1798   my $self = shift;
1799   my ($rv, $sth, @bind) = $self->_select(@_);
1800   my @row = $sth->fetchrow_array;
1801   my @nextrow = $sth->fetchrow_array if @row;
1802   if(@row && @nextrow) {
1803     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1804   }
1805   # Need to call finish() to work round broken DBDs
1806   $sth->finish();
1807   return @row;
1808 }
1809
1810 =head2 sth
1811
1812 =over 4
1813
1814 =item Arguments: $sql
1815
1816 =back
1817
1818 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1819
1820 =cut
1821
1822 sub _dbh_sth {
1823   my ($self, $dbh, $sql) = @_;
1824
1825   # 3 is the if_active parameter which avoids active sth re-use
1826   my $sth = $self->disable_sth_caching
1827     ? $dbh->prepare($sql)
1828     : $dbh->prepare_cached($sql, {}, 3);
1829
1830   # XXX You would think RaiseError would make this impossible,
1831   #  but apparently that's not true :(
1832   $self->throw_exception($dbh->errstr) if !$sth;
1833
1834   $sth;
1835 }
1836
1837 sub sth {
1838   my ($self, $sql) = @_;
1839   $self->dbh_do('_dbh_sth', $sql);
1840 }
1841
1842 sub _dbh_columns_info_for {
1843   my ($self, $dbh, $table) = @_;
1844
1845   if ($dbh->can('column_info')) {
1846     my %result;
1847     eval {
1848       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1849       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1850       $sth->execute();
1851       while ( my $info = $sth->fetchrow_hashref() ){
1852         my %column_info;
1853         $column_info{data_type}   = $info->{TYPE_NAME};
1854         $column_info{size}      = $info->{COLUMN_SIZE};
1855         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1856         $column_info{default_value} = $info->{COLUMN_DEF};
1857         my $col_name = $info->{COLUMN_NAME};
1858         $col_name =~ s/^\"(.*)\"$/$1/;
1859
1860         $result{$col_name} = \%column_info;
1861       }
1862     };
1863     return \%result if !$@ && scalar keys %result;
1864   }
1865
1866   my %result;
1867   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1868   $sth->execute;
1869   my @columns = @{$sth->{NAME_lc}};
1870   for my $i ( 0 .. $#columns ){
1871     my %column_info;
1872     $column_info{data_type} = $sth->{TYPE}->[$i];
1873     $column_info{size} = $sth->{PRECISION}->[$i];
1874     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1875
1876     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1877       $column_info{data_type} = $1;
1878       $column_info{size}    = $2;
1879     }
1880
1881     $result{$columns[$i]} = \%column_info;
1882   }
1883   $sth->finish;
1884
1885   foreach my $col (keys %result) {
1886     my $colinfo = $result{$col};
1887     my $type_num = $colinfo->{data_type};
1888     my $type_name;
1889     if(defined $type_num && $dbh->can('type_info')) {
1890       my $type_info = $dbh->type_info($type_num);
1891       $type_name = $type_info->{TYPE_NAME} if $type_info;
1892       $colinfo->{data_type} = $type_name if $type_name;
1893     }
1894   }
1895
1896   return \%result;
1897 }
1898
1899 sub columns_info_for {
1900   my ($self, $table) = @_;
1901   $self->dbh_do('_dbh_columns_info_for', $table);
1902 }
1903
1904 =head2 last_insert_id
1905
1906 Return the row id of the last insert.
1907
1908 =cut
1909
1910 sub _dbh_last_insert_id {
1911     # All Storage's need to register their own _dbh_last_insert_id
1912     # the old SQLite-based method was highly inappropriate
1913
1914     my $self = shift;
1915     my $class = ref $self;
1916     $self->throw_exception (<<EOE);
1917
1918 No _dbh_last_insert_id() method found in $class.
1919 Since the method of obtaining the autoincrement id of the last insert
1920 operation varies greatly between different databases, this method must be
1921 individually implemented for every storage class.
1922 EOE
1923 }
1924
1925 sub last_insert_id {
1926   my $self = shift;
1927   $self->dbh_do('_dbh_last_insert_id', @_);
1928 }
1929
1930 =head2 sqlt_type
1931
1932 Returns the database driver name.
1933
1934 =cut
1935
1936 sub sqlt_type { shift->_get_dbh->{Driver}->{Name} }
1937
1938 =head2 bind_attribute_by_data_type
1939
1940 Given a datatype from column info, returns a database specific bind
1941 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1942 let the database planner just handle it.
1943
1944 Generally only needed for special case column types, like bytea in postgres.
1945
1946 =cut
1947
1948 sub bind_attribute_by_data_type {
1949     return;
1950 }
1951
1952 =head2 is_datatype_numeric
1953
1954 Given a datatype from column_info, returns a boolean value indicating if
1955 the current RDBMS considers it a numeric value. This controls how
1956 L<DBIx::Class::Row/set_column> decides whether to mark the column as
1957 dirty - when the datatype is deemed numeric a C<< != >> comparison will
1958 be performed instead of the usual C<eq>.
1959
1960 =cut
1961
1962 sub is_datatype_numeric {
1963   my ($self, $dt) = @_;
1964
1965   return 0 unless $dt;
1966
1967   return $dt =~ /^ (?:
1968     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
1969   ) $/ix;
1970 }
1971
1972
1973 =head2 create_ddl_dir (EXPERIMENTAL)
1974
1975 =over 4
1976
1977 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1978
1979 =back
1980
1981 Creates a SQL file based on the Schema, for each of the specified
1982 database engines in C<\@databases> in the given directory.
1983 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
1984
1985 Given a previous version number, this will also create a file containing
1986 the ALTER TABLE statements to transform the previous schema into the
1987 current one. Note that these statements may contain C<DROP TABLE> or
1988 C<DROP COLUMN> statements that can potentially destroy data.
1989
1990 The file names are created using the C<ddl_filename> method below, please
1991 override this method in your schema if you would like a different file
1992 name format. For the ALTER file, the same format is used, replacing
1993 $version in the name with "$preversion-$version".
1994
1995 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
1996 The most common value for this would be C<< { add_drop_table => 1 } >>
1997 to have the SQL produced include a C<DROP TABLE> statement for each table
1998 created. For quoting purposes supply C<quote_table_names> and
1999 C<quote_field_names>.
2000
2001 If no arguments are passed, then the following default values are assumed:
2002
2003 =over 4
2004
2005 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2006
2007 =item version    - $schema->schema_version
2008
2009 =item directory  - './'
2010
2011 =item preversion - <none>
2012
2013 =back
2014
2015 By default, C<\%sqlt_args> will have
2016
2017  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2018
2019 merged with the hash passed in. To disable any of those features, pass in a
2020 hashref like the following
2021
2022  { ignore_constraint_names => 0, # ... other options }
2023
2024
2025 Note that this feature is currently EXPERIMENTAL and may not work correctly
2026 across all databases, or fully handle complex relationships.
2027
2028 WARNING: Please check all SQL files created, before applying them.
2029
2030 =cut
2031
2032 sub create_ddl_dir {
2033   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2034
2035   if(!$dir || !-d $dir) {
2036     carp "No directory given, using ./\n";
2037     $dir = "./";
2038   }
2039   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2040   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2041
2042   my $schema_version = $schema->schema_version || '1.x';
2043   $version ||= $schema_version;
2044
2045   $sqltargs = {
2046     add_drop_table => 1,
2047     ignore_constraint_names => 1,
2048     ignore_index_names => 1,
2049     %{$sqltargs || {}}
2050   };
2051
2052   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
2053       . $self->_check_sqlt_message . q{'})
2054           if !$self->_check_sqlt_version;
2055
2056   my $sqlt = SQL::Translator->new( $sqltargs );
2057
2058   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2059   my $sqlt_schema = $sqlt->translate({ data => $schema })
2060     or $self->throw_exception ($sqlt->error);
2061
2062   foreach my $db (@$databases) {
2063     $sqlt->reset();
2064     $sqlt->{schema} = $sqlt_schema;
2065     $sqlt->producer($db);
2066
2067     my $file;
2068     my $filename = $schema->ddl_filename($db, $version, $dir);
2069     if (-e $filename && ($version eq $schema_version )) {
2070       # if we are dumping the current version, overwrite the DDL
2071       carp "Overwriting existing DDL file - $filename";
2072       unlink($filename);
2073     }
2074
2075     my $output = $sqlt->translate;
2076     if(!$output) {
2077       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2078       next;
2079     }
2080     if(!open($file, ">$filename")) {
2081       $self->throw_exception("Can't open $filename for writing ($!)");
2082       next;
2083     }
2084     print $file $output;
2085     close($file);
2086
2087     next unless ($preversion);
2088
2089     require SQL::Translator::Diff;
2090
2091     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2092     if(!-e $prefilename) {
2093       carp("No previous schema file found ($prefilename)");
2094       next;
2095     }
2096
2097     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2098     if(-e $difffile) {
2099       carp("Overwriting existing diff file - $difffile");
2100       unlink($difffile);
2101     }
2102
2103     my $source_schema;
2104     {
2105       my $t = SQL::Translator->new($sqltargs);
2106       $t->debug( 0 );
2107       $t->trace( 0 );
2108
2109       $t->parser( $db )
2110         or $self->throw_exception ($t->error);
2111
2112       my $out = $t->translate( $prefilename )
2113         or $self->throw_exception ($t->error);
2114
2115       $source_schema = $t->schema;
2116
2117       $source_schema->name( $prefilename )
2118         unless ( $source_schema->name );
2119     }
2120
2121     # The "new" style of producers have sane normalization and can support
2122     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2123     # And we have to diff parsed SQL against parsed SQL.
2124     my $dest_schema = $sqlt_schema;
2125
2126     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2127       my $t = SQL::Translator->new($sqltargs);
2128       $t->debug( 0 );
2129       $t->trace( 0 );
2130
2131       $t->parser( $db )
2132         or $self->throw_exception ($t->error);
2133
2134       my $out = $t->translate( $filename )
2135         or $self->throw_exception ($t->error);
2136
2137       $dest_schema = $t->schema;
2138
2139       $dest_schema->name( $filename )
2140         unless $dest_schema->name;
2141     }
2142
2143     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2144                                                   $dest_schema,   $db,
2145                                                   $sqltargs
2146                                                  );
2147     if(!open $file, ">$difffile") {
2148       $self->throw_exception("Can't write to $difffile ($!)");
2149       next;
2150     }
2151     print $file $diff;
2152     close($file);
2153   }
2154 }
2155
2156 =head2 deployment_statements
2157
2158 =over 4
2159
2160 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2161
2162 =back
2163
2164 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2165
2166 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2167 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2168
2169 C<$directory> is used to return statements from files in a previously created
2170 L</create_ddl_dir> directory and is optional. The filenames are constructed
2171 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2172
2173 If no C<$directory> is specified then the statements are constructed on the
2174 fly using L<SQL::Translator> and C<$version> is ignored.
2175
2176 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2177
2178 =cut
2179
2180 sub deployment_statements {
2181   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2182   # Need to be connected to get the correct sqlt_type
2183   $self->_get_dbh() unless $type;
2184   $type ||= $self->sqlt_type;
2185   $version ||= $schema->schema_version || '1.x';
2186   $dir ||= './';
2187   my $filename = $schema->ddl_filename($type, $version, $dir);
2188   if(-f $filename)
2189   {
2190       my $file;
2191       open($file, "<$filename")
2192         or $self->throw_exception("Can't open $filename ($!)");
2193       my @rows = <$file>;
2194       close($file);
2195       return join('', @rows);
2196   }
2197
2198   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
2199       . $self->_check_sqlt_message . q{'})
2200           if !$self->_check_sqlt_version;
2201
2202   require SQL::Translator::Parser::DBIx::Class;
2203   eval qq{use SQL::Translator::Producer::${type}};
2204   $self->throw_exception($@) if $@;
2205
2206   # sources needs to be a parser arg, but for simplicty allow at top level
2207   # coming in
2208   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2209       if exists $sqltargs->{sources};
2210
2211   my $tr = SQL::Translator->new(%$sqltargs);
2212   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
2213   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
2214 }
2215
2216 sub deploy {
2217   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2218   my $deploy = sub {
2219     my $line = shift;
2220     return if($line =~ /^--/);
2221     return if(!$line);
2222     # next if($line =~ /^DROP/m);
2223     return if($line =~ /^BEGIN TRANSACTION/m);
2224     return if($line =~ /^COMMIT/m);
2225     return if $line =~ /^\s+$/; # skip whitespace only
2226     $self->_query_start($line);
2227     eval {
2228       $self->_get_dbh->do($line);
2229     };
2230     if ($@) {
2231       carp qq{$@ (running "${line}")};
2232     }
2233     $self->_query_end($line);
2234   };
2235   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2236   if (@statements > 1) {
2237     foreach my $statement (@statements) {
2238       $deploy->( $statement );
2239     }
2240   }
2241   elsif (@statements == 1) {
2242     foreach my $line ( split(";\n", $statements[0])) {
2243       $deploy->( $line );
2244     }
2245   }
2246 }
2247
2248 =head2 datetime_parser
2249
2250 Returns the datetime parser class
2251
2252 =cut
2253
2254 sub datetime_parser {
2255   my $self = shift;
2256   return $self->{datetime_parser} ||= do {
2257     $self->_get_dbh;
2258     $self->build_datetime_parser(@_);
2259   };
2260 }
2261
2262 =head2 datetime_parser_type
2263
2264 Defines (returns) the datetime parser class - currently hardwired to
2265 L<DateTime::Format::MySQL>
2266
2267 =cut
2268
2269 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2270
2271 =head2 build_datetime_parser
2272
2273 See L</datetime_parser>
2274
2275 =cut
2276
2277 sub build_datetime_parser {
2278   my $self = shift;
2279   my $type = $self->datetime_parser_type(@_);
2280   eval "use ${type}";
2281   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2282   return $type;
2283 }
2284
2285 {
2286     my $_check_sqlt_version; # private
2287     my $_check_sqlt_message; # private
2288     sub _check_sqlt_version {
2289         return $_check_sqlt_version if defined $_check_sqlt_version;
2290         eval 'use SQL::Translator "0.09003"';
2291         $_check_sqlt_message = $@ || '';
2292         $_check_sqlt_version = !$@;
2293     }
2294
2295     sub _check_sqlt_message {
2296         _check_sqlt_version if !defined $_check_sqlt_message;
2297         $_check_sqlt_message;
2298     }
2299 }
2300
2301 =head2 is_replicating
2302
2303 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2304 replicate from a master database.  Default is undef, which is the result
2305 returned by databases that don't support replication.
2306
2307 =cut
2308
2309 sub is_replicating {
2310     return;
2311
2312 }
2313
2314 =head2 lag_behind_master
2315
2316 Returns a number that represents a certain amount of lag behind a master db
2317 when a given storage is replicating.  The number is database dependent, but
2318 starts at zero and increases with the amount of lag. Default in undef
2319
2320 =cut
2321
2322 sub lag_behind_master {
2323     return;
2324 }
2325
2326 sub DESTROY {
2327   my $self = shift;
2328   return if !$self->_dbh;
2329   $self->_verify_pid;
2330   $self->_dbh(undef);
2331 }
2332
2333 1;
2334
2335 =head1 USAGE NOTES
2336
2337 =head2 DBIx::Class and AutoCommit
2338
2339 DBIx::Class can do some wonderful magic with handling exceptions,
2340 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2341 combined with C<txn_do> for transaction support.
2342
2343 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2344 in an assumed transaction between commits, and you're telling us you'd
2345 like to manage that manually.  A lot of the magic protections offered by
2346 this module will go away.  We can't protect you from exceptions due to database
2347 disconnects because we don't know anything about how to restart your
2348 transactions.  You're on your own for handling all sorts of exceptional
2349 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2350 be with raw DBI.
2351
2352
2353
2354 =head1 AUTHORS
2355
2356 Matt S. Trout <mst@shadowcatsystems.co.uk>
2357
2358 Andy Grundman <andy@hybridized.org>
2359
2360 =head1 LICENSE
2361
2362 You may distribute this code under the same terms as Perl itself.
2363
2364 =cut