Merge 'trunk' into 'prefetch_bug-unqualified_column_in_search_related_cond'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17
18 # what version of sqlt do we require if deploy() without a ddl_dir is invoked
19 # when changing also adjust the corresponding author_require in Makefile.PL
20 my $minimum_sqlt_version = '0.11002';
21
22
23 __PACKAGE__->mk_group_accessors('simple' =>
24   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
25      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
26 );
27
28 # the values for these accessors are picked out (and deleted) from
29 # the attribute hashref passed to connect_info
30 my @storage_options = qw/
31   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
32   disable_sth_caching unsafe auto_savepoint
33 /;
34 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
35
36
37 # default cursor class, overridable in connect_info attributes
38 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
39
40 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
41 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
42
43
44 # Each of these methods need _determine_driver called before itself
45 # in order to function reliably. This is a purely DRY optimization
46 my @rdbms_specific_methods = qw/
47   sqlt_type
48   build_datetime_parser
49   datetime_parser_type
50
51   insert
52   insert_bulk
53   update
54   delete
55   select
56   select_single
57 /;
58
59 for my $meth (@rdbms_specific_methods) {
60
61   my $orig = __PACKAGE__->can ($meth)
62     or next;
63
64   no strict qw/refs/;
65   no warnings qw/redefine/;
66   *{__PACKAGE__ ."::$meth"} = sub {
67     if (not $_[0]->_driver_determined) {
68       $_[0]->_determine_driver;
69       goto $_[0]->can($meth);
70     }
71     $orig->(@_);
72   };
73 }
74
75
76 =head1 NAME
77
78 DBIx::Class::Storage::DBI - DBI storage handler
79
80 =head1 SYNOPSIS
81
82   my $schema = MySchema->connect('dbi:SQLite:my.db');
83
84   $schema->storage->debug(1);
85
86   my @stuff = $schema->storage->dbh_do(
87     sub {
88       my ($storage, $dbh, @args) = @_;
89       $dbh->do("DROP TABLE authors");
90     },
91     @column_list
92   );
93
94   $schema->resultset('Book')->search({
95      written_on => $schema->storage->datetime_parser(DateTime->now)
96   });
97
98 =head1 DESCRIPTION
99
100 This class represents the connection to an RDBMS via L<DBI>.  See
101 L<DBIx::Class::Storage> for general information.  This pod only
102 documents DBI-specific methods and behaviors.
103
104 =head1 METHODS
105
106 =cut
107
108 sub new {
109   my $new = shift->next::method(@_);
110
111   $new->transaction_depth(0);
112   $new->_sql_maker_opts({});
113   $new->{savepoints} = [];
114   $new->{_in_dbh_do} = 0;
115   $new->{_dbh_gen} = 0;
116
117   $new;
118 }
119
120 =head2 connect_info
121
122 This method is normally called by L<DBIx::Class::Schema/connection>, which
123 encapsulates its argument list in an arrayref before passing them here.
124
125 The argument list may contain:
126
127 =over
128
129 =item *
130
131 The same 4-element argument set one would normally pass to
132 L<DBI/connect>, optionally followed by
133 L<extra attributes|/DBIx::Class specific connection attributes>
134 recognized by DBIx::Class:
135
136   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
137
138 =item *
139
140 A single code reference which returns a connected
141 L<DBI database handle|DBI/connect> optionally followed by
142 L<extra attributes|/DBIx::Class specific connection attributes> recognized
143 by DBIx::Class:
144
145   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
146
147 =item *
148
149 A single hashref with all the attributes and the dsn/user/password
150 mixed together:
151
152   $connect_info_args = [{
153     dsn => $dsn,
154     user => $user,
155     password => $pass,
156     %dbi_attributes,
157     %extra_attributes,
158   }];
159
160   $connect_info_args = [{
161     dbh_maker => sub { DBI->connect (...) },
162     %dbi_attributes,
163     %extra_attributes,
164   }];
165
166 This is particularly useful for L<Catalyst> based applications, allowing the
167 following config (L<Config::General> style):
168
169   <Model::DB>
170     schema_class   App::DB
171     <connect_info>
172       dsn          dbi:mysql:database=test
173       user         testuser
174       password     TestPass
175       AutoCommit   1
176     </connect_info>
177   </Model::DB>
178
179 The C<dsn>/C<user>/C<password> combination can be substituted by the
180 C<dbh_maker> key whose value is a coderef that returns a connected
181 L<DBI database handle|DBI/connect>
182
183 =back
184
185 Please note that the L<DBI> docs recommend that you always explicitly
186 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
187 recommends that it be set to I<1>, and that you perform transactions
188 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
189 to I<1> if you do not do explicitly set it to zero.  This is the default
190 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
191
192 =head3 DBIx::Class specific connection attributes
193
194 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
195 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
196 the following connection options. These options can be mixed in with your other
197 L<DBI> connection attributes, or placed in a seperate hashref
198 (C<\%extra_attributes>) as shown above.
199
200 Every time C<connect_info> is invoked, any previous settings for
201 these options will be cleared before setting the new ones, regardless of
202 whether any options are specified in the new C<connect_info>.
203
204
205 =over
206
207 =item on_connect_do
208
209 Specifies things to do immediately after connecting or re-connecting to
210 the database.  Its value may contain:
211
212 =over
213
214 =item a scalar
215
216 This contains one SQL statement to execute.
217
218 =item an array reference
219
220 This contains SQL statements to execute in order.  Each element contains
221 a string or a code reference that returns a string.
222
223 =item a code reference
224
225 This contains some code to execute.  Unlike code references within an
226 array reference, its return value is ignored.
227
228 =back
229
230 =item on_disconnect_do
231
232 Takes arguments in the same form as L</on_connect_do> and executes them
233 immediately before disconnecting from the database.
234
235 Note, this only runs if you explicitly call L</disconnect> on the
236 storage object.
237
238 =item on_connect_call
239
240 A more generalized form of L</on_connect_do> that calls the specified
241 C<connect_call_METHOD> methods in your storage driver.
242
243   on_connect_do => 'select 1'
244
245 is equivalent to:
246
247   on_connect_call => [ [ do_sql => 'select 1' ] ]
248
249 Its values may contain:
250
251 =over
252
253 =item a scalar
254
255 Will call the C<connect_call_METHOD> method.
256
257 =item a code reference
258
259 Will execute C<< $code->($storage) >>
260
261 =item an array reference
262
263 Each value can be a method name or code reference.
264
265 =item an array of arrays
266
267 For each array, the first item is taken to be the C<connect_call_> method name
268 or code reference, and the rest are parameters to it.
269
270 =back
271
272 Some predefined storage methods you may use:
273
274 =over
275
276 =item do_sql
277
278 Executes a SQL string or a code reference that returns a SQL string. This is
279 what L</on_connect_do> and L</on_disconnect_do> use.
280
281 It can take:
282
283 =over
284
285 =item a scalar
286
287 Will execute the scalar as SQL.
288
289 =item an arrayref
290
291 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
292 attributes hashref and bind values.
293
294 =item a code reference
295
296 Will execute C<< $code->($storage) >> and execute the return array refs as
297 above.
298
299 =back
300
301 =item datetime_setup
302
303 Execute any statements necessary to initialize the database session to return
304 and accept datetime/timestamp values used with
305 L<DBIx::Class::InflateColumn::DateTime>.
306
307 Only necessary for some databases, see your specific storage driver for
308 implementation details.
309
310 =back
311
312 =item on_disconnect_call
313
314 Takes arguments in the same form as L</on_connect_call> and executes them
315 immediately before disconnecting from the database.
316
317 Calls the C<disconnect_call_METHOD> methods as opposed to the
318 C<connect_call_METHOD> methods called by L</on_connect_call>.
319
320 Note, this only runs if you explicitly call L</disconnect> on the
321 storage object.
322
323 =item disable_sth_caching
324
325 If set to a true value, this option will disable the caching of
326 statement handles via L<DBI/prepare_cached>.
327
328 =item limit_dialect
329
330 Sets the limit dialect. This is useful for JDBC-bridge among others
331 where the remote SQL-dialect cannot be determined by the name of the
332 driver alone. See also L<SQL::Abstract::Limit>.
333
334 =item quote_char
335
336 Specifies what characters to use to quote table and column names. If
337 you use this you will want to specify L</name_sep> as well.
338
339 C<quote_char> expects either a single character, in which case is it
340 is placed on either side of the table/column name, or an arrayref of length
341 2 in which case the table/column name is placed between the elements.
342
343 For example under MySQL you should use C<< quote_char => '`' >>, and for
344 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
345
346 =item name_sep
347
348 This only needs to be used in conjunction with C<quote_char>, and is used to
349 specify the charecter that seperates elements (schemas, tables, columns) from
350 each other. In most cases this is simply a C<.>.
351
352 The consequences of not supplying this value is that L<SQL::Abstract>
353 will assume DBIx::Class' uses of aliases to be complete column
354 names. The output will look like I<"me.name"> when it should actually
355 be I<"me"."name">.
356
357 =item unsafe
358
359 This Storage driver normally installs its own C<HandleError>, sets
360 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
361 all database handles, including those supplied by a coderef.  It does this
362 so that it can have consistent and useful error behavior.
363
364 If you set this option to a true value, Storage will not do its usual
365 modifications to the database handle's attributes, and instead relies on
366 the settings in your connect_info DBI options (or the values you set in
367 your connection coderef, in the case that you are connecting via coderef).
368
369 Note that your custom settings can cause Storage to malfunction,
370 especially if you set a C<HandleError> handler that suppresses exceptions
371 and/or disable C<RaiseError>.
372
373 =item auto_savepoint
374
375 If this option is true, L<DBIx::Class> will use savepoints when nesting
376 transactions, making it possible to recover from failure in the inner
377 transaction without having to abort all outer transactions.
378
379 =item cursor_class
380
381 Use this argument to supply a cursor class other than the default
382 L<DBIx::Class::Storage::DBI::Cursor>.
383
384 =back
385
386 Some real-life examples of arguments to L</connect_info> and
387 L<DBIx::Class::Schema/connect>
388
389   # Simple SQLite connection
390   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
391
392   # Connect via subref
393   ->connect_info([ sub { DBI->connect(...) } ]);
394
395   # Connect via subref in hashref
396   ->connect_info([{
397     dbh_maker => sub { DBI->connect(...) },
398     on_connect_do => 'alter session ...',
399   }]);
400
401   # A bit more complicated
402   ->connect_info(
403     [
404       'dbi:Pg:dbname=foo',
405       'postgres',
406       'my_pg_password',
407       { AutoCommit => 1 },
408       { quote_char => q{"}, name_sep => q{.} },
409     ]
410   );
411
412   # Equivalent to the previous example
413   ->connect_info(
414     [
415       'dbi:Pg:dbname=foo',
416       'postgres',
417       'my_pg_password',
418       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
419     ]
420   );
421
422   # Same, but with hashref as argument
423   # See parse_connect_info for explanation
424   ->connect_info(
425     [{
426       dsn         => 'dbi:Pg:dbname=foo',
427       user        => 'postgres',
428       password    => 'my_pg_password',
429       AutoCommit  => 1,
430       quote_char  => q{"},
431       name_sep    => q{.},
432     }]
433   );
434
435   # Subref + DBIx::Class-specific connection options
436   ->connect_info(
437     [
438       sub { DBI->connect(...) },
439       {
440           quote_char => q{`},
441           name_sep => q{@},
442           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
443           disable_sth_caching => 1,
444       },
445     ]
446   );
447
448
449
450 =cut
451
452 sub connect_info {
453   my ($self, $info_arg) = @_;
454
455   return $self->_connect_info if !$info_arg;
456
457   my @args = @$info_arg;  # take a shallow copy for further mutilation
458   $self->_connect_info([@args]); # copy for _connect_info
459
460
461   # combine/pre-parse arguments depending on invocation style
462
463   my %attrs;
464   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
465     %attrs = %{ $args[1] || {} };
466     @args = $args[0];
467   }
468   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
469     %attrs = %{$args[0]};
470     @args = ();
471     if (my $code = delete $attrs{dbh_maker}) {
472       @args = $code;
473
474       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
475       if (@ignored) {
476         carp sprintf (
477             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
478           . "to the result of 'dbh_maker'",
479
480           join (', ', map { "'$_'" } (@ignored) ),
481         );
482       }
483     }
484     else {
485       @args = delete @attrs{qw/dsn user password/};
486     }
487   }
488   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
489     %attrs = (
490       % { $args[3] || {} },
491       % { $args[4] || {} },
492     );
493     @args = @args[0,1,2];
494   }
495
496   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
497   #  the new set of options
498   $self->_sql_maker(undef);
499   $self->_sql_maker_opts({});
500
501   if(keys %attrs) {
502     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
503       if(my $value = delete $attrs{$storage_opt}) {
504         $self->$storage_opt($value);
505       }
506     }
507     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
508       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
509         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
510       }
511     }
512   }
513
514   if (ref $args[0] eq 'CODE') {
515     # _connect() never looks past $args[0] in this case
516     %attrs = ()
517   } else {
518     %attrs = (
519       %{ $self->_default_dbi_connect_attributes || {} },
520       %attrs,
521     );
522   }
523
524   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
525   $self->_connect_info;
526 }
527
528 sub _default_dbi_connect_attributes {
529   return {
530     AutoCommit => 1,
531     RaiseError => 1,
532     PrintError => 0,
533   };
534 }
535
536 =head2 on_connect_do
537
538 This method is deprecated in favour of setting via L</connect_info>.
539
540 =cut
541
542 =head2 on_disconnect_do
543
544 This method is deprecated in favour of setting via L</connect_info>.
545
546 =cut
547
548 sub _parse_connect_do {
549   my ($self, $type) = @_;
550
551   my $val = $self->$type;
552   return () if not defined $val;
553
554   my @res;
555
556   if (not ref($val)) {
557     push @res, [ 'do_sql', $val ];
558   } elsif (ref($val) eq 'CODE') {
559     push @res, $val;
560   } elsif (ref($val) eq 'ARRAY') {
561     push @res, map { [ 'do_sql', $_ ] } @$val;
562   } else {
563     $self->throw_exception("Invalid type for $type: ".ref($val));
564   }
565
566   return \@res;
567 }
568
569 =head2 dbh_do
570
571 Arguments: ($subref | $method_name), @extra_coderef_args?
572
573 Execute the given $subref or $method_name using the new exception-based
574 connection management.
575
576 The first two arguments will be the storage object that C<dbh_do> was called
577 on and a database handle to use.  Any additional arguments will be passed
578 verbatim to the called subref as arguments 2 and onwards.
579
580 Using this (instead of $self->_dbh or $self->dbh) ensures correct
581 exception handling and reconnection (or failover in future subclasses).
582
583 Your subref should have no side-effects outside of the database, as
584 there is the potential for your subref to be partially double-executed
585 if the database connection was stale/dysfunctional.
586
587 Example:
588
589   my @stuff = $schema->storage->dbh_do(
590     sub {
591       my ($storage, $dbh, @cols) = @_;
592       my $cols = join(q{, }, @cols);
593       $dbh->selectrow_array("SELECT $cols FROM foo");
594     },
595     @column_list
596   );
597
598 =cut
599
600 sub dbh_do {
601   my $self = shift;
602   my $code = shift;
603
604   my $dbh = $self->_get_dbh;
605
606   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
607       || $self->{transaction_depth};
608
609   local $self->{_in_dbh_do} = 1;
610
611   my @result;
612   my $want_array = wantarray;
613
614   eval {
615
616     if($want_array) {
617         @result = $self->$code($dbh, @_);
618     }
619     elsif(defined $want_array) {
620         $result[0] = $self->$code($dbh, @_);
621     }
622     else {
623         $self->$code($dbh, @_);
624     }
625   };
626
627   # ->connected might unset $@ - copy
628   my $exception = $@;
629   if(!$exception) { return $want_array ? @result : $result[0] }
630
631   $self->throw_exception($exception) if $self->connected;
632
633   # We were not connected - reconnect and retry, but let any
634   #  exception fall right through this time
635   carp "Retrying $code after catching disconnected exception: $exception"
636     if $ENV{DBIC_DBIRETRY_DEBUG};
637   $self->_populate_dbh;
638   $self->$code($self->_dbh, @_);
639 }
640
641 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
642 # It also informs dbh_do to bypass itself while under the direction of txn_do,
643 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
644 sub txn_do {
645   my $self = shift;
646   my $coderef = shift;
647
648   ref $coderef eq 'CODE' or $self->throw_exception
649     ('$coderef must be a CODE reference');
650
651   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
652
653   local $self->{_in_dbh_do} = 1;
654
655   my @result;
656   my $want_array = wantarray;
657
658   my $tried = 0;
659   while(1) {
660     eval {
661       $self->_get_dbh;
662
663       $self->txn_begin;
664       if($want_array) {
665           @result = $coderef->(@_);
666       }
667       elsif(defined $want_array) {
668           $result[0] = $coderef->(@_);
669       }
670       else {
671           $coderef->(@_);
672       }
673       $self->txn_commit;
674     };
675
676     # ->connected might unset $@ - copy
677     my $exception = $@;
678     if(!$exception) { return $want_array ? @result : $result[0] }
679
680     if($tried++ || $self->connected) {
681       eval { $self->txn_rollback };
682       my $rollback_exception = $@;
683       if($rollback_exception) {
684         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
685         $self->throw_exception($exception)  # propagate nested rollback
686           if $rollback_exception =~ /$exception_class/;
687
688         $self->throw_exception(
689           "Transaction aborted: ${exception}. "
690           . "Rollback failed: ${rollback_exception}"
691         );
692       }
693       $self->throw_exception($exception)
694     }
695
696     # We were not connected, and was first try - reconnect and retry
697     # via the while loop
698     carp "Retrying $coderef after catching disconnected exception: $exception"
699       if $ENV{DBIC_DBIRETRY_DEBUG};
700     $self->_populate_dbh;
701   }
702 }
703
704 =head2 disconnect
705
706 Our C<disconnect> method also performs a rollback first if the
707 database is not in C<AutoCommit> mode.
708
709 =cut
710
711 sub disconnect {
712   my ($self) = @_;
713
714   if( $self->_dbh ) {
715     my @actions;
716
717     push @actions, ( $self->on_disconnect_call || () );
718     push @actions, $self->_parse_connect_do ('on_disconnect_do');
719
720     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
721
722     $self->_dbh_rollback unless $self->_dbh_autocommit;
723
724     $self->_dbh->disconnect;
725     $self->_dbh(undef);
726     $self->{_dbh_gen}++;
727   }
728 }
729
730 =head2 with_deferred_fk_checks
731
732 =over 4
733
734 =item Arguments: C<$coderef>
735
736 =item Return Value: The return value of $coderef
737
738 =back
739
740 Storage specific method to run the code ref with FK checks deferred or
741 in MySQL's case disabled entirely.
742
743 =cut
744
745 # Storage subclasses should override this
746 sub with_deferred_fk_checks {
747   my ($self, $sub) = @_;
748   $sub->();
749 }
750
751 =head2 connected
752
753 =over
754
755 =item Arguments: none
756
757 =item Return Value: 1|0
758
759 =back
760
761 Verifies that the the current database handle is active and ready to execute
762 an SQL statement (i.e. the connection did not get stale, server is still
763 answering, etc.) This method is used internally by L</dbh>.
764
765 =cut
766
767 sub connected {
768   my $self = shift;
769   return 0 unless $self->_seems_connected;
770
771   #be on the safe side
772   local $self->_dbh->{RaiseError} = 1;
773
774   return $self->_ping;
775 }
776
777 sub _seems_connected {
778   my $self = shift;
779
780   my $dbh = $self->_dbh
781     or return 0;
782
783   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
784     $self->_dbh(undef);
785     $self->{_dbh_gen}++;
786     return 0;
787   }
788   else {
789     $self->_verify_pid;
790     return 0 if !$self->_dbh;
791   }
792
793   return $dbh->FETCH('Active');
794 }
795
796 sub _ping {
797   my $self = shift;
798
799   my $dbh = $self->_dbh or return 0;
800
801   return $dbh->ping;
802 }
803
804 # handle pid changes correctly
805 #  NOTE: assumes $self->_dbh is a valid $dbh
806 sub _verify_pid {
807   my ($self) = @_;
808
809   return if defined $self->_conn_pid && $self->_conn_pid == $$;
810
811   $self->_dbh->{InactiveDestroy} = 1;
812   $self->_dbh(undef);
813   $self->{_dbh_gen}++;
814
815   return;
816 }
817
818 sub ensure_connected {
819   my ($self) = @_;
820
821   unless ($self->connected) {
822     $self->_populate_dbh;
823   }
824 }
825
826 =head2 dbh
827
828 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
829 is guaranteed to be healthy by implicitly calling L</connected>, and if
830 necessary performing a reconnection before returning. Keep in mind that this
831 is very B<expensive> on some database engines. Consider using L<dbh_do>
832 instead.
833
834 =cut
835
836 sub dbh {
837   my ($self) = @_;
838
839   if (not $self->_dbh) {
840     $self->_populate_dbh;
841   } else {
842     $self->ensure_connected;
843   }
844   return $self->_dbh;
845 }
846
847 # this is the internal "get dbh or connect (don't check)" method
848 sub _get_dbh {
849   my $self = shift;
850   $self->_verify_pid if $self->_dbh;
851   $self->_populate_dbh unless $self->_dbh;
852   return $self->_dbh;
853 }
854
855 sub _sql_maker_args {
856     my ($self) = @_;
857
858     return (
859       bindtype=>'columns',
860       array_datatypes => 1,
861       limit_dialect => $self->_get_dbh,
862       %{$self->_sql_maker_opts}
863     );
864 }
865
866 sub sql_maker {
867   my ($self) = @_;
868   unless ($self->_sql_maker) {
869     my $sql_maker_class = $self->sql_maker_class;
870     $self->ensure_class_loaded ($sql_maker_class);
871     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
872   }
873   return $self->_sql_maker;
874 }
875
876 # nothing to do by default
877 sub _rebless {}
878 sub _init {}
879
880 sub _populate_dbh {
881   my ($self) = @_;
882
883   my @info = @{$self->_dbi_connect_info || []};
884   $self->_dbh(undef); # in case ->connected failed we might get sent here
885   $self->_dbh($self->_connect(@info));
886
887   $self->_conn_pid($$);
888   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
889
890   $self->_determine_driver;
891
892   # Always set the transaction depth on connect, since
893   #  there is no transaction in progress by definition
894   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
895
896   $self->_run_connection_actions unless $self->{_in_determine_driver};
897 }
898
899 sub _run_connection_actions {
900   my $self = shift;
901   my @actions;
902
903   push @actions, ( $self->on_connect_call || () );
904   push @actions, $self->_parse_connect_do ('on_connect_do');
905
906   $self->_do_connection_actions(connect_call_ => $_) for @actions;
907 }
908
909 sub _determine_driver {
910   my ($self) = @_;
911
912   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
913     my $started_connected = 0;
914     local $self->{_in_determine_driver} = 1;
915
916     if (ref($self) eq __PACKAGE__) {
917       my $driver;
918       if ($self->_dbh) { # we are connected
919         $driver = $self->_dbh->{Driver}{Name};
920         $started_connected = 1;
921       } else {
922         # if connect_info is a CODEREF, we have no choice but to connect
923         if (ref $self->_dbi_connect_info->[0] &&
924             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
925           $self->_populate_dbh;
926           $driver = $self->_dbh->{Driver}{Name};
927         }
928         else {
929           # try to use dsn to not require being connected, the driver may still
930           # force a connection in _rebless to determine version
931           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
932         }
933       }
934
935       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
936       if ($self->load_optional_class($storage_class)) {
937         mro::set_mro($storage_class, 'c3');
938         bless $self, $storage_class;
939         $self->_rebless();
940       }
941     }
942
943     $self->_driver_determined(1);
944
945     $self->_init; # run driver-specific initializations
946
947     $self->_run_connection_actions
948         if !$started_connected && defined $self->_dbh;
949   }
950 }
951
952 sub _do_connection_actions {
953   my $self          = shift;
954   my $method_prefix = shift;
955   my $call          = shift;
956
957   if (not ref($call)) {
958     my $method = $method_prefix . $call;
959     $self->$method(@_);
960   } elsif (ref($call) eq 'CODE') {
961     $self->$call(@_);
962   } elsif (ref($call) eq 'ARRAY') {
963     if (ref($call->[0]) ne 'ARRAY') {
964       $self->_do_connection_actions($method_prefix, $_) for @$call;
965     } else {
966       $self->_do_connection_actions($method_prefix, @$_) for @$call;
967     }
968   } else {
969     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
970   }
971
972   return $self;
973 }
974
975 sub connect_call_do_sql {
976   my $self = shift;
977   $self->_do_query(@_);
978 }
979
980 sub disconnect_call_do_sql {
981   my $self = shift;
982   $self->_do_query(@_);
983 }
984
985 # override in db-specific backend when necessary
986 sub connect_call_datetime_setup { 1 }
987
988 sub _do_query {
989   my ($self, $action) = @_;
990
991   if (ref $action eq 'CODE') {
992     $action = $action->($self);
993     $self->_do_query($_) foreach @$action;
994   }
995   else {
996     # Most debuggers expect ($sql, @bind), so we need to exclude
997     # the attribute hash which is the second argument to $dbh->do
998     # furthermore the bind values are usually to be presented
999     # as named arrayref pairs, so wrap those here too
1000     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1001     my $sql = shift @do_args;
1002     my $attrs = shift @do_args;
1003     my @bind = map { [ undef, $_ ] } @do_args;
1004
1005     $self->_query_start($sql, @bind);
1006     $self->_get_dbh->do($sql, $attrs, @do_args);
1007     $self->_query_end($sql, @bind);
1008   }
1009
1010   return $self;
1011 }
1012
1013 sub _connect {
1014   my ($self, @info) = @_;
1015
1016   $self->throw_exception("You failed to provide any connection info")
1017     if !@info;
1018
1019   my ($old_connect_via, $dbh);
1020
1021   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1022     $old_connect_via = $DBI::connect_via;
1023     $DBI::connect_via = 'connect';
1024   }
1025
1026   eval {
1027     if(ref $info[0] eq 'CODE') {
1028        $dbh = &{$info[0]}
1029     }
1030     else {
1031        $dbh = DBI->connect(@info);
1032     }
1033
1034     if($dbh && !$self->unsafe) {
1035       my $weak_self = $self;
1036       Scalar::Util::weaken($weak_self);
1037       $dbh->{HandleError} = sub {
1038           if ($weak_self) {
1039             $weak_self->throw_exception("DBI Exception: $_[0]");
1040           }
1041           else {
1042             # the handler may be invoked by something totally out of
1043             # the scope of DBIC
1044             croak ("DBI Exception: $_[0]");
1045           }
1046       };
1047       $dbh->{ShowErrorStatement} = 1;
1048       $dbh->{RaiseError} = 1;
1049       $dbh->{PrintError} = 0;
1050     }
1051   };
1052
1053   $DBI::connect_via = $old_connect_via if $old_connect_via;
1054
1055   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1056     if !$dbh || $@;
1057
1058   $self->_dbh_autocommit($dbh->{AutoCommit});
1059
1060   $dbh;
1061 }
1062
1063 sub svp_begin {
1064   my ($self, $name) = @_;
1065
1066   $name = $self->_svp_generate_name
1067     unless defined $name;
1068
1069   $self->throw_exception ("You can't use savepoints outside a transaction")
1070     if $self->{transaction_depth} == 0;
1071
1072   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1073     unless $self->can('_svp_begin');
1074
1075   push @{ $self->{savepoints} }, $name;
1076
1077   $self->debugobj->svp_begin($name) if $self->debug;
1078
1079   return $self->_svp_begin($name);
1080 }
1081
1082 sub svp_release {
1083   my ($self, $name) = @_;
1084
1085   $self->throw_exception ("You can't use savepoints outside a transaction")
1086     if $self->{transaction_depth} == 0;
1087
1088   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1089     unless $self->can('_svp_release');
1090
1091   if (defined $name) {
1092     $self->throw_exception ("Savepoint '$name' does not exist")
1093       unless grep { $_ eq $name } @{ $self->{savepoints} };
1094
1095     # Dig through the stack until we find the one we are releasing.  This keeps
1096     # the stack up to date.
1097     my $svp;
1098
1099     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1100   } else {
1101     $name = pop @{ $self->{savepoints} };
1102   }
1103
1104   $self->debugobj->svp_release($name) if $self->debug;
1105
1106   return $self->_svp_release($name);
1107 }
1108
1109 sub svp_rollback {
1110   my ($self, $name) = @_;
1111
1112   $self->throw_exception ("You can't use savepoints outside a transaction")
1113     if $self->{transaction_depth} == 0;
1114
1115   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1116     unless $self->can('_svp_rollback');
1117
1118   if (defined $name) {
1119       # If they passed us a name, verify that it exists in the stack
1120       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1121           $self->throw_exception("Savepoint '$name' does not exist!");
1122       }
1123
1124       # Dig through the stack until we find the one we are releasing.  This keeps
1125       # the stack up to date.
1126       while(my $s = pop(@{ $self->{savepoints} })) {
1127           last if($s eq $name);
1128       }
1129       # Add the savepoint back to the stack, as a rollback doesn't remove the
1130       # named savepoint, only everything after it.
1131       push(@{ $self->{savepoints} }, $name);
1132   } else {
1133       # We'll assume they want to rollback to the last savepoint
1134       $name = $self->{savepoints}->[-1];
1135   }
1136
1137   $self->debugobj->svp_rollback($name) if $self->debug;
1138
1139   return $self->_svp_rollback($name);
1140 }
1141
1142 sub _svp_generate_name {
1143     my ($self) = @_;
1144
1145     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1146 }
1147
1148 sub txn_begin {
1149   my $self = shift;
1150   if($self->{transaction_depth} == 0) {
1151     $self->debugobj->txn_begin()
1152       if $self->debug;
1153     $self->_dbh_begin_work;
1154   }
1155   elsif ($self->auto_savepoint) {
1156     $self->svp_begin;
1157   }
1158   $self->{transaction_depth}++;
1159 }
1160
1161 sub _dbh_begin_work {
1162   my $self = shift;
1163
1164   # if the user is utilizing txn_do - good for him, otherwise we need to
1165   # ensure that the $dbh is healthy on BEGIN.
1166   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1167   # will be replaced by a failure of begin_work itself (which will be
1168   # then retried on reconnect)
1169   if ($self->{_in_dbh_do}) {
1170     $self->_dbh->begin_work;
1171   } else {
1172     $self->dbh_do(sub { $_[1]->begin_work });
1173   }
1174 }
1175
1176 sub txn_commit {
1177   my $self = shift;
1178   if ($self->{transaction_depth} == 1) {
1179     $self->debugobj->txn_commit()
1180       if ($self->debug);
1181     $self->_dbh_commit;
1182     $self->{transaction_depth} = 0
1183       if $self->_dbh_autocommit;
1184   }
1185   elsif($self->{transaction_depth} > 1) {
1186     $self->{transaction_depth}--;
1187     $self->svp_release
1188       if $self->auto_savepoint;
1189   }
1190 }
1191
1192 sub _dbh_commit {
1193   my $self = shift;
1194   my $dbh  = $self->_dbh
1195     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1196   $dbh->commit;
1197 }
1198
1199 sub txn_rollback {
1200   my $self = shift;
1201   my $dbh = $self->_dbh;
1202   eval {
1203     if ($self->{transaction_depth} == 1) {
1204       $self->debugobj->txn_rollback()
1205         if ($self->debug);
1206       $self->{transaction_depth} = 0
1207         if $self->_dbh_autocommit;
1208       $self->_dbh_rollback;
1209     }
1210     elsif($self->{transaction_depth} > 1) {
1211       $self->{transaction_depth}--;
1212       if ($self->auto_savepoint) {
1213         $self->svp_rollback;
1214         $self->svp_release;
1215       }
1216     }
1217     else {
1218       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1219     }
1220   };
1221   if ($@) {
1222     my $error = $@;
1223     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1224     $error =~ /$exception_class/ and $self->throw_exception($error);
1225     # ensure that a failed rollback resets the transaction depth
1226     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1227     $self->throw_exception($error);
1228   }
1229 }
1230
1231 sub _dbh_rollback {
1232   my $self = shift;
1233   my $dbh  = $self->_dbh
1234     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1235   $dbh->rollback;
1236 }
1237
1238 # This used to be the top-half of _execute.  It was split out to make it
1239 #  easier to override in NoBindVars without duping the rest.  It takes up
1240 #  all of _execute's args, and emits $sql, @bind.
1241 sub _prep_for_execute {
1242   my ($self, $op, $extra_bind, $ident, $args) = @_;
1243
1244   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1245     $ident = $ident->from();
1246   }
1247
1248   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1249
1250   unshift(@bind,
1251     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1252       if $extra_bind;
1253   return ($sql, \@bind);
1254 }
1255
1256
1257 sub _fix_bind_params {
1258     my ($self, @bind) = @_;
1259
1260     ### Turn @bind from something like this:
1261     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1262     ### to this:
1263     ###   ( "'1'", "'1'", "'3'" )
1264     return
1265         map {
1266             if ( defined( $_ && $_->[1] ) ) {
1267                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1268             }
1269             else { q{'NULL'}; }
1270         } @bind;
1271 }
1272
1273 sub _query_start {
1274     my ( $self, $sql, @bind ) = @_;
1275
1276     if ( $self->debug ) {
1277         @bind = $self->_fix_bind_params(@bind);
1278
1279         $self->debugobj->query_start( $sql, @bind );
1280     }
1281 }
1282
1283 sub _query_end {
1284     my ( $self, $sql, @bind ) = @_;
1285
1286     if ( $self->debug ) {
1287         @bind = $self->_fix_bind_params(@bind);
1288         $self->debugobj->query_end( $sql, @bind );
1289     }
1290 }
1291
1292 sub _dbh_execute {
1293   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1294
1295   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1296
1297   $self->_query_start( $sql, @$bind );
1298
1299   my $sth = $self->sth($sql,$op);
1300
1301   my $placeholder_index = 1;
1302
1303   foreach my $bound (@$bind) {
1304     my $attributes = {};
1305     my($column_name, @data) = @$bound;
1306
1307     if ($bind_attributes) {
1308       $attributes = $bind_attributes->{$column_name}
1309       if defined $bind_attributes->{$column_name};
1310     }
1311
1312     foreach my $data (@data) {
1313       my $ref = ref $data;
1314       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1315
1316       $sth->bind_param($placeholder_index, $data, $attributes);
1317       $placeholder_index++;
1318     }
1319   }
1320
1321   # Can this fail without throwing an exception anyways???
1322   my $rv = $sth->execute();
1323   $self->throw_exception($sth->errstr) if !$rv;
1324
1325   $self->_query_end( $sql, @$bind );
1326
1327   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1328 }
1329
1330 sub _execute {
1331     my $self = shift;
1332     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1333 }
1334
1335 sub insert {
1336   my ($self, $source, $to_insert) = @_;
1337
1338   my $ident = $source->from;
1339   my $bind_attributes = $self->source_bind_attributes($source);
1340
1341   my $updated_cols = {};
1342
1343   foreach my $col ( $source->columns ) {
1344     if ( !defined $to_insert->{$col} ) {
1345       my $col_info = $source->column_info($col);
1346
1347       if ( $col_info->{auto_nextval} ) {
1348         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1349           'nextval',
1350           $col_info->{sequence} ||
1351             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1352         );
1353       }
1354     }
1355   }
1356
1357   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1358
1359   return $updated_cols;
1360 }
1361
1362 ## Still not quite perfect, and EXPERIMENTAL
1363 ## Currently it is assumed that all values passed will be "normal", i.e. not
1364 ## scalar refs, or at least, all the same type as the first set, the statement is
1365 ## only prepped once.
1366 sub insert_bulk {
1367   my ($self, $source, $cols, $data) = @_;
1368
1369   my %colvalues;
1370   @colvalues{@$cols} = (0..$#$cols);
1371
1372   for my $i (0..$#$cols) {
1373     my $first_val = $data->[0][$i];
1374     next unless ref $first_val eq 'SCALAR';
1375
1376     $colvalues{ $cols->[$i] } = $first_val;
1377   }
1378
1379   # check for bad data and stringify stringifiable objects
1380   my $bad_slice = sub {
1381     my ($msg, $col_idx, $slice_idx) = @_;
1382     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1383       $msg,
1384       $cols->[$col_idx],
1385       do {
1386         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1387         Data::Dumper::Concise::Dumper({
1388           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1389         }),
1390       }
1391     );
1392   };
1393
1394   for my $datum_idx (0..$#$data) {
1395     my $datum = $data->[$datum_idx];
1396
1397     for my $col_idx (0..$#$cols) {
1398       my $val            = $datum->[$col_idx];
1399       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1400       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1401
1402       if ($is_literal_sql) {
1403         if (not ref $val) {
1404           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1405         }
1406         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1407           $bad_slice->("$reftype reference found where literal SQL expected",
1408             $col_idx, $datum_idx);
1409         }
1410         elsif ($$val ne $$sqla_bind){
1411           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1412             $col_idx, $datum_idx);
1413         }
1414       }
1415       elsif (my $reftype = ref $val) {
1416         require overload;
1417         if (overload::Method($val, '""')) {
1418           $datum->[$col_idx] = "".$val;
1419         }
1420         else {
1421           $bad_slice->("$reftype reference found where bind expected",
1422             $col_idx, $datum_idx);
1423         }
1424       }
1425     }
1426   }
1427
1428   my ($sql, $bind) = $self->_prep_for_execute (
1429     'insert', undef, $source, [\%colvalues]
1430   );
1431   my @bind = @$bind;
1432
1433   my $empty_bind = 1 if (not @bind) &&
1434     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1435
1436   if ((not @bind) && (not $empty_bind)) {
1437     $self->throw_exception(
1438       'Cannot insert_bulk without support for placeholders'
1439     );
1440   }
1441
1442   $self->_query_start( $sql, ['__BULK__'] );
1443   my $sth = $self->sth($sql);
1444
1445   my $rv = do {
1446     if ($empty_bind) {
1447       # bind_param_array doesn't work if there are no binds
1448       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1449     }
1450     else {
1451 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1452       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1453     }
1454   };
1455
1456   $self->_query_end( $sql, ['__BULK__'] );
1457
1458   return (wantarray ? ($rv, $sth, @bind) : $rv);
1459 }
1460
1461 sub _execute_array {
1462   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1463
1464   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1465
1466   ## This must be an arrayref, else nothing works!
1467   my $tuple_status = [];
1468
1469   ## Get the bind_attributes, if any exist
1470   my $bind_attributes = $self->source_bind_attributes($source);
1471
1472   ## Bind the values and execute
1473   my $placeholder_index = 1;
1474
1475   foreach my $bound (@$bind) {
1476
1477     my $attributes = {};
1478     my ($column_name, $data_index) = @$bound;
1479
1480     if( $bind_attributes ) {
1481       $attributes = $bind_attributes->{$column_name}
1482       if defined $bind_attributes->{$column_name};
1483     }
1484
1485     my @data = map { $_->[$data_index] } @$data;
1486
1487     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1488     $placeholder_index++;
1489   }
1490
1491   my $rv = eval {
1492     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1493   };
1494   my $err = $@ || $sth->errstr;
1495
1496 # Statement must finish even if there was an exception.
1497   eval { $sth->finish };
1498   $err = $@ unless $err;
1499
1500   if ($err) {
1501     my $i = 0;
1502     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1503
1504     $self->throw_exception("Unexpected populate error: $err")
1505       if ($i > $#$tuple_status);
1506
1507     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1508       ($tuple_status->[$i][1] || $err),
1509       Data::Dumper::Concise::Dumper({
1510         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1511       }),
1512     );
1513   }
1514
1515   $guard->commit if $guard;
1516
1517   return $rv;
1518 }
1519
1520 sub _dbh_execute_array {
1521     my ($self, $sth, $tuple_status, @extra) = @_;
1522
1523     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1524 }
1525
1526 sub _dbh_execute_inserts_with_no_binds {
1527   my ($self, $sth, $count) = @_;
1528
1529   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1530
1531   eval {
1532     my $dbh = $self->_get_dbh;
1533     local $dbh->{RaiseError} = 1;
1534     local $dbh->{PrintError} = 0;
1535
1536     $sth->execute foreach 1..$count;
1537   };
1538   my $exception = $@;
1539
1540 # Make sure statement is finished even if there was an exception.
1541   eval { $sth->finish };
1542   $exception = $@ unless $exception;
1543
1544   $self->throw_exception($exception) if $exception;
1545
1546   $guard->commit if $guard;
1547
1548   return $count;
1549 }
1550
1551 sub update {
1552   my ($self, $source, @args) = @_; 
1553
1554   my $bind_attrs = $self->source_bind_attributes($source);
1555
1556   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1557 }
1558
1559
1560 sub delete {
1561   my ($self, $source, @args) = @_;
1562
1563   my $bind_attrs = $self->source_bind_attributes($source);
1564
1565   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1566 }
1567
1568 # Most databases do not allow aliasing of tables in UPDATE/DELETE. Thus
1569 # a condition containing 'me' or other table prefixes will not work
1570 # at all. What this code tries to do (badly) is introspect the condition
1571 # and remove all column qualifiers. If it bails out early (returns undef)
1572 # the calling code should try another approach (e.g. a subquery)
1573 sub _strip_cond_qualifiers {
1574   my ($self, $where) = @_;
1575
1576   my $cond = {};
1577
1578   # No-op. No condition, we're updating/deleting everything
1579   return $cond unless $where;
1580
1581   if (ref $where eq 'ARRAY') {
1582     $cond = [
1583       map {
1584         my %hash;
1585         foreach my $key (keys %{$_}) {
1586           $key =~ /([^.]+)$/;
1587           $hash{$1} = $_->{$key};
1588         }
1589         \%hash;
1590       } @$where
1591     ];
1592   }
1593   elsif (ref $where eq 'HASH') {
1594     if ( (keys %$where) == 1 && ( (keys %{$where})[0] eq '-and' )) {
1595       $cond->{-and} = [];
1596       my @cond = @{$where->{-and}};
1597        for (my $i = 0; $i < @cond; $i++) {
1598         my $entry = $cond[$i];
1599         my $hash;
1600         if (ref $entry eq 'HASH') {
1601           $hash = $self->_strip_cond_qualifiers($entry);
1602         }
1603         else {
1604           $entry =~ /([^.]+)$/;
1605           $hash->{$1} = $cond[++$i];
1606         }
1607         push @{$cond->{-and}}, $hash;
1608       }
1609     }
1610     else {
1611       foreach my $key (keys %$where) {
1612         $key =~ /([^.]+)$/;
1613         $cond->{$1} = $where->{$key};
1614       }
1615     }
1616   }
1617   else {
1618     return undef;
1619   }
1620
1621   return $cond;
1622 }
1623
1624 # We were sent here because the $rs contains a complex search
1625 # which will require a subquery to select the correct rows
1626 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1627 #
1628 # Generating a single PK column subquery is trivial and supported
1629 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1630 # Look at _multipk_update_delete()
1631 sub _subq_update_delete {
1632   my $self = shift;
1633   my ($rs, $op, $values) = @_;
1634
1635   my $rsrc = $rs->result_source;
1636
1637   my @pcols = $rsrc->primary_columns;
1638
1639   if (@pcols == 1) {
1640     return $self->$op (
1641       $rsrc,
1642       $op eq 'update' ? $values : (),
1643       { $pcols[0] => { -in => $rs->as_query } },
1644     );
1645   }
1646
1647   else {
1648     return $self->_multipk_update_delete (@_);
1649   }
1650 }
1651
1652 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1653 # resultset update/delete involving subqueries. So by default resort
1654 # to simple (and inefficient) delete_all style per-row opearations,
1655 # while allowing specific storages to override this with a faster
1656 # implementation.
1657 #
1658 sub _multipk_update_delete {
1659   return shift->_per_row_update_delete (@_);
1660 }
1661
1662 # This is the default loop used to delete/update rows for multi PK
1663 # resultsets, and used by mysql exclusively (because it can't do anything
1664 # else).
1665 #
1666 # We do not use $row->$op style queries, because resultset update/delete
1667 # is not expected to cascade (this is what delete_all/update_all is for).
1668 #
1669 # There should be no race conditions as the entire operation is rolled
1670 # in a transaction.
1671 #
1672 sub _per_row_update_delete {
1673   my $self = shift;
1674   my ($rs, $op, $values) = @_;
1675
1676   my $rsrc = $rs->result_source;
1677   my @pcols = $rsrc->primary_columns;
1678
1679   my $guard = $self->txn_scope_guard;
1680
1681   # emulate the return value of $sth->execute for non-selects
1682   my $row_cnt = '0E0';
1683
1684   my $subrs_cur = $rs->cursor;
1685   while (my @pks = $subrs_cur->next) {
1686
1687     my $cond;
1688     for my $i (0.. $#pcols) {
1689       $cond->{$pcols[$i]} = $pks[$i];
1690     }
1691
1692     $self->$op (
1693       $rsrc,
1694       $op eq 'update' ? $values : (),
1695       $cond,
1696     );
1697
1698     $row_cnt++;
1699   }
1700
1701   $guard->commit;
1702
1703   return $row_cnt;
1704 }
1705
1706 sub _select {
1707   my $self = shift;
1708
1709   # localization is neccessary as
1710   # 1) there is no infrastructure to pass this around before SQLA2
1711   # 2) _select_args sets it and _prep_for_execute consumes it
1712   my $sql_maker = $self->sql_maker;
1713   local $sql_maker->{_dbic_rs_attrs};
1714
1715   return $self->_execute($self->_select_args(@_));
1716 }
1717
1718 sub _select_args_to_query {
1719   my $self = shift;
1720
1721   # localization is neccessary as
1722   # 1) there is no infrastructure to pass this around before SQLA2
1723   # 2) _select_args sets it and _prep_for_execute consumes it
1724   my $sql_maker = $self->sql_maker;
1725   local $sql_maker->{_dbic_rs_attrs};
1726
1727   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1728   #  = $self->_select_args($ident, $select, $cond, $attrs);
1729   my ($op, $bind, $ident, $bind_attrs, @args) =
1730     $self->_select_args(@_);
1731
1732   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1733   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1734   $prepared_bind ||= [];
1735
1736   return wantarray
1737     ? ($sql, $prepared_bind, $bind_attrs)
1738     : \[ "($sql)", @$prepared_bind ]
1739   ;
1740 }
1741
1742 sub _select_args {
1743   my ($self, $ident, $select, $where, $attrs) = @_;
1744
1745   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1746
1747   my $sql_maker = $self->sql_maker;
1748   $sql_maker->{_dbic_rs_attrs} = {
1749     %$attrs,
1750     select => $select,
1751     from => $ident,
1752     where => $where,
1753     $rs_alias
1754       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1755       : ()
1756     ,
1757   };
1758
1759   # calculate bind_attrs before possible $ident mangling
1760   my $bind_attrs = {};
1761   for my $alias (keys %$alias2source) {
1762     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1763     for my $col (keys %$bindtypes) {
1764
1765       my $fqcn = join ('.', $alias, $col);
1766       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1767
1768       # Unqialified column names are nice, but at the same time can be
1769       # rather ambiguous. What we do here is basically go along with
1770       # the loop, adding an unqualified column slot to $bind_attrs,
1771       # alongside the fully qualified name. As soon as we encounter
1772       # another column by that name (which would imply another table)
1773       # we unset the unqualified slot and never add any info to it
1774       # to avoid erroneous type binding. If this happens the users
1775       # only choice will be to fully qualify his column name
1776
1777       if (exists $bind_attrs->{$col}) {
1778         $bind_attrs->{$col} = {};
1779       }
1780       else {
1781         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1782       }
1783     }
1784   }
1785
1786   # adjust limits
1787   if (
1788     $attrs->{software_limit}
1789       ||
1790     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1791   ) {
1792     $attrs->{software_limit} = 1;
1793   }
1794   else {
1795     $self->throw_exception("rows attribute must be positive if present")
1796       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1797
1798     # MySQL actually recommends this approach.  I cringe.
1799     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1800   }
1801
1802   my @limit;
1803
1804   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1805   # otherwise delegate the limiting to the storage, unless software limit was requested
1806   if (
1807     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1808        ||
1809     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1810       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1811   ) {
1812     ($ident, $select, $where, $attrs)
1813       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1814   }
1815   elsif (! $attrs->{software_limit} ) {
1816     push @limit, $attrs->{rows}, $attrs->{offset};
1817   }
1818
1819 ###
1820   # This would be the point to deflate anything found in $where
1821   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1822   # expect a row object. And all we have is a resultsource (it is trivial
1823   # to extract deflator coderefs via $alias2source above).
1824   #
1825   # I don't see a way forward other than changing the way deflators are
1826   # invoked, and that's just bad...
1827 ###
1828
1829   my $order = { map
1830     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1831     (qw/order_by group_by having/ )
1832   };
1833
1834   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1835 }
1836
1837 # Returns a counting SELECT for a simple count
1838 # query. Abstracted so that a storage could override
1839 # this to { count => 'firstcol' } or whatever makes
1840 # sense as a performance optimization
1841 sub _count_select {
1842   #my ($self, $source, $rs_attrs) = @_;
1843   return { count => '*' };
1844 }
1845
1846 # Returns a SELECT which will end up in the subselect
1847 # There may or may not be a group_by, as the subquery
1848 # might have been called to accomodate a limit
1849 #
1850 # Most databases would be happy with whatever ends up
1851 # here, but some choke in various ways.
1852 #
1853 sub _subq_count_select {
1854   my ($self, $source, $rs_attrs) = @_;
1855   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1856
1857   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1858   return @pcols ? \@pcols : [ 1 ];
1859 }
1860
1861 sub source_bind_attributes {
1862   my ($self, $source) = @_;
1863
1864   my $bind_attributes;
1865   foreach my $column ($source->columns) {
1866
1867     my $data_type = $source->column_info($column)->{data_type} || '';
1868     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1869      if $data_type;
1870   }
1871
1872   return $bind_attributes;
1873 }
1874
1875 =head2 select
1876
1877 =over 4
1878
1879 =item Arguments: $ident, $select, $condition, $attrs
1880
1881 =back
1882
1883 Handle a SQL select statement.
1884
1885 =cut
1886
1887 sub select {
1888   my $self = shift;
1889   my ($ident, $select, $condition, $attrs) = @_;
1890   return $self->cursor_class->new($self, \@_, $attrs);
1891 }
1892
1893 sub select_single {
1894   my $self = shift;
1895   my ($rv, $sth, @bind) = $self->_select(@_);
1896   my @row = $sth->fetchrow_array;
1897   my @nextrow = $sth->fetchrow_array if @row;
1898   if(@row && @nextrow) {
1899     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1900   }
1901   # Need to call finish() to work round broken DBDs
1902   $sth->finish();
1903   return @row;
1904 }
1905
1906 =head2 sth
1907
1908 =over 4
1909
1910 =item Arguments: $sql
1911
1912 =back
1913
1914 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1915
1916 =cut
1917
1918 sub _dbh_sth {
1919   my ($self, $dbh, $sql) = @_;
1920
1921   # 3 is the if_active parameter which avoids active sth re-use
1922   my $sth = $self->disable_sth_caching
1923     ? $dbh->prepare($sql)
1924     : $dbh->prepare_cached($sql, {}, 3);
1925
1926   # XXX You would think RaiseError would make this impossible,
1927   #  but apparently that's not true :(
1928   $self->throw_exception($dbh->errstr) if !$sth;
1929
1930   $sth;
1931 }
1932
1933 sub sth {
1934   my ($self, $sql) = @_;
1935   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
1936 }
1937
1938 sub _dbh_columns_info_for {
1939   my ($self, $dbh, $table) = @_;
1940
1941   if ($dbh->can('column_info')) {
1942     my %result;
1943     eval {
1944       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1945       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1946       $sth->execute();
1947       while ( my $info = $sth->fetchrow_hashref() ){
1948         my %column_info;
1949         $column_info{data_type}   = $info->{TYPE_NAME};
1950         $column_info{size}      = $info->{COLUMN_SIZE};
1951         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1952         $column_info{default_value} = $info->{COLUMN_DEF};
1953         my $col_name = $info->{COLUMN_NAME};
1954         $col_name =~ s/^\"(.*)\"$/$1/;
1955
1956         $result{$col_name} = \%column_info;
1957       }
1958     };
1959     return \%result if !$@ && scalar keys %result;
1960   }
1961
1962   my %result;
1963   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1964   $sth->execute;
1965   my @columns = @{$sth->{NAME_lc}};
1966   for my $i ( 0 .. $#columns ){
1967     my %column_info;
1968     $column_info{data_type} = $sth->{TYPE}->[$i];
1969     $column_info{size} = $sth->{PRECISION}->[$i];
1970     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1971
1972     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1973       $column_info{data_type} = $1;
1974       $column_info{size}    = $2;
1975     }
1976
1977     $result{$columns[$i]} = \%column_info;
1978   }
1979   $sth->finish;
1980
1981   foreach my $col (keys %result) {
1982     my $colinfo = $result{$col};
1983     my $type_num = $colinfo->{data_type};
1984     my $type_name;
1985     if(defined $type_num && $dbh->can('type_info')) {
1986       my $type_info = $dbh->type_info($type_num);
1987       $type_name = $type_info->{TYPE_NAME} if $type_info;
1988       $colinfo->{data_type} = $type_name if $type_name;
1989     }
1990   }
1991
1992   return \%result;
1993 }
1994
1995 sub columns_info_for {
1996   my ($self, $table) = @_;
1997   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
1998 }
1999
2000 =head2 last_insert_id
2001
2002 Return the row id of the last insert.
2003
2004 =cut
2005
2006 sub _dbh_last_insert_id {
2007     # All Storage's need to register their own _dbh_last_insert_id
2008     # the old SQLite-based method was highly inappropriate
2009
2010     my $self = shift;
2011     my $class = ref $self;
2012     $self->throw_exception (<<EOE);
2013
2014 No _dbh_last_insert_id() method found in $class.
2015 Since the method of obtaining the autoincrement id of the last insert
2016 operation varies greatly between different databases, this method must be
2017 individually implemented for every storage class.
2018 EOE
2019 }
2020
2021 sub last_insert_id {
2022   my $self = shift;
2023   $self->_dbh_last_insert_id ($self->_dbh, @_);
2024 }
2025
2026 =head2 _native_data_type
2027
2028 =over 4
2029
2030 =item Arguments: $type_name
2031
2032 =back
2033
2034 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2035 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2036 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
2037
2038 The default implementation returns C<undef>, implement in your Storage driver if
2039 you need this functionality.
2040
2041 Should map types from other databases to the native RDBMS type, for example
2042 C<VARCHAR2> to C<VARCHAR>.
2043
2044 Types with modifiers should map to the underlying data type. For example,
2045 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2046
2047 Composite types should map to the container type, for example
2048 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2049
2050 =cut
2051
2052 sub _native_data_type {
2053   #my ($self, $data_type) = @_;
2054   return undef
2055 }
2056
2057 # Check if placeholders are supported at all
2058 sub _placeholders_supported {
2059   my $self = shift;
2060   my $dbh  = $self->_get_dbh;
2061
2062   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2063   # but it is inaccurate more often than not
2064   eval {
2065     local $dbh->{PrintError} = 0;
2066     local $dbh->{RaiseError} = 1;
2067     $dbh->do('select ?', {}, 1);
2068   };
2069   return $@ ? 0 : 1;
2070 }
2071
2072 # Check if placeholders bound to non-string types throw exceptions
2073 #
2074 sub _typeless_placeholders_supported {
2075   my $self = shift;
2076   my $dbh  = $self->_get_dbh;
2077
2078   eval {
2079     local $dbh->{PrintError} = 0;
2080     local $dbh->{RaiseError} = 1;
2081     # this specifically tests a bind that is NOT a string
2082     $dbh->do('select 1 where 1 = ?', {}, 1);
2083   };
2084   return $@ ? 0 : 1;
2085 }
2086
2087 =head2 sqlt_type
2088
2089 Returns the database driver name.
2090
2091 =cut
2092
2093 sub sqlt_type {
2094   shift->_get_dbh->{Driver}->{Name};
2095 }
2096
2097 =head2 bind_attribute_by_data_type
2098
2099 Given a datatype from column info, returns a database specific bind
2100 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2101 let the database planner just handle it.
2102
2103 Generally only needed for special case column types, like bytea in postgres.
2104
2105 =cut
2106
2107 sub bind_attribute_by_data_type {
2108     return;
2109 }
2110
2111 =head2 is_datatype_numeric
2112
2113 Given a datatype from column_info, returns a boolean value indicating if
2114 the current RDBMS considers it a numeric value. This controls how
2115 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2116 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2117 be performed instead of the usual C<eq>.
2118
2119 =cut
2120
2121 sub is_datatype_numeric {
2122   my ($self, $dt) = @_;
2123
2124   return 0 unless $dt;
2125
2126   return $dt =~ /^ (?:
2127     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2128   ) $/ix;
2129 }
2130
2131
2132 =head2 create_ddl_dir (EXPERIMENTAL)
2133
2134 =over 4
2135
2136 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2137
2138 =back
2139
2140 Creates a SQL file based on the Schema, for each of the specified
2141 database engines in C<\@databases> in the given directory.
2142 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2143
2144 Given a previous version number, this will also create a file containing
2145 the ALTER TABLE statements to transform the previous schema into the
2146 current one. Note that these statements may contain C<DROP TABLE> or
2147 C<DROP COLUMN> statements that can potentially destroy data.
2148
2149 The file names are created using the C<ddl_filename> method below, please
2150 override this method in your schema if you would like a different file
2151 name format. For the ALTER file, the same format is used, replacing
2152 $version in the name with "$preversion-$version".
2153
2154 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2155 The most common value for this would be C<< { add_drop_table => 1 } >>
2156 to have the SQL produced include a C<DROP TABLE> statement for each table
2157 created. For quoting purposes supply C<quote_table_names> and
2158 C<quote_field_names>.
2159
2160 If no arguments are passed, then the following default values are assumed:
2161
2162 =over 4
2163
2164 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2165
2166 =item version    - $schema->schema_version
2167
2168 =item directory  - './'
2169
2170 =item preversion - <none>
2171
2172 =back
2173
2174 By default, C<\%sqlt_args> will have
2175
2176  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2177
2178 merged with the hash passed in. To disable any of those features, pass in a
2179 hashref like the following
2180
2181  { ignore_constraint_names => 0, # ... other options }
2182
2183
2184 Note that this feature is currently EXPERIMENTAL and may not work correctly
2185 across all databases, or fully handle complex relationships.
2186
2187 WARNING: Please check all SQL files created, before applying them.
2188
2189 =cut
2190
2191 sub create_ddl_dir {
2192   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2193
2194   if(!$dir || !-d $dir) {
2195     carp "No directory given, using ./\n";
2196     $dir = "./";
2197   }
2198   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2199   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2200
2201   my $schema_version = $schema->schema_version || '1.x';
2202   $version ||= $schema_version;
2203
2204   $sqltargs = {
2205     add_drop_table => 1,
2206     ignore_constraint_names => 1,
2207     ignore_index_names => 1,
2208     %{$sqltargs || {}}
2209   };
2210
2211   $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
2212     if !$self->_sqlt_version_ok;
2213
2214   my $sqlt = SQL::Translator->new( $sqltargs );
2215
2216   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2217   my $sqlt_schema = $sqlt->translate({ data => $schema })
2218     or $self->throw_exception ($sqlt->error);
2219
2220   foreach my $db (@$databases) {
2221     $sqlt->reset();
2222     $sqlt->{schema} = $sqlt_schema;
2223     $sqlt->producer($db);
2224
2225     my $file;
2226     my $filename = $schema->ddl_filename($db, $version, $dir);
2227     if (-e $filename && ($version eq $schema_version )) {
2228       # if we are dumping the current version, overwrite the DDL
2229       carp "Overwriting existing DDL file - $filename";
2230       unlink($filename);
2231     }
2232
2233     my $output = $sqlt->translate;
2234     if(!$output) {
2235       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2236       next;
2237     }
2238     if(!open($file, ">$filename")) {
2239       $self->throw_exception("Can't open $filename for writing ($!)");
2240       next;
2241     }
2242     print $file $output;
2243     close($file);
2244
2245     next unless ($preversion);
2246
2247     require SQL::Translator::Diff;
2248
2249     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2250     if(!-e $prefilename) {
2251       carp("No previous schema file found ($prefilename)");
2252       next;
2253     }
2254
2255     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2256     if(-e $difffile) {
2257       carp("Overwriting existing diff file - $difffile");
2258       unlink($difffile);
2259     }
2260
2261     my $source_schema;
2262     {
2263       my $t = SQL::Translator->new($sqltargs);
2264       $t->debug( 0 );
2265       $t->trace( 0 );
2266
2267       $t->parser( $db )
2268         or $self->throw_exception ($t->error);
2269
2270       my $out = $t->translate( $prefilename )
2271         or $self->throw_exception ($t->error);
2272
2273       $source_schema = $t->schema;
2274
2275       $source_schema->name( $prefilename )
2276         unless ( $source_schema->name );
2277     }
2278
2279     # The "new" style of producers have sane normalization and can support
2280     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2281     # And we have to diff parsed SQL against parsed SQL.
2282     my $dest_schema = $sqlt_schema;
2283
2284     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2285       my $t = SQL::Translator->new($sqltargs);
2286       $t->debug( 0 );
2287       $t->trace( 0 );
2288
2289       $t->parser( $db )
2290         or $self->throw_exception ($t->error);
2291
2292       my $out = $t->translate( $filename )
2293         or $self->throw_exception ($t->error);
2294
2295       $dest_schema = $t->schema;
2296
2297       $dest_schema->name( $filename )
2298         unless $dest_schema->name;
2299     }
2300
2301     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2302                                                   $dest_schema,   $db,
2303                                                   $sqltargs
2304                                                  );
2305     if(!open $file, ">$difffile") {
2306       $self->throw_exception("Can't write to $difffile ($!)");
2307       next;
2308     }
2309     print $file $diff;
2310     close($file);
2311   }
2312 }
2313
2314 =head2 deployment_statements
2315
2316 =over 4
2317
2318 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2319
2320 =back
2321
2322 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2323
2324 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2325 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2326
2327 C<$directory> is used to return statements from files in a previously created
2328 L</create_ddl_dir> directory and is optional. The filenames are constructed
2329 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2330
2331 If no C<$directory> is specified then the statements are constructed on the
2332 fly using L<SQL::Translator> and C<$version> is ignored.
2333
2334 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2335
2336 =cut
2337
2338 sub deployment_statements {
2339   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2340   $type ||= $self->sqlt_type;
2341   $version ||= $schema->schema_version || '1.x';
2342   $dir ||= './';
2343   my $filename = $schema->ddl_filename($type, $version, $dir);
2344   if(-f $filename)
2345   {
2346       my $file;
2347       open($file, "<$filename")
2348         or $self->throw_exception("Can't open $filename ($!)");
2349       my @rows = <$file>;
2350       close($file);
2351       return join('', @rows);
2352   }
2353
2354   $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
2355     if !$self->_sqlt_version_ok;
2356
2357   # sources needs to be a parser arg, but for simplicty allow at top level
2358   # coming in
2359   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2360       if exists $sqltargs->{sources};
2361
2362   my $tr = SQL::Translator->new(
2363     producer => "SQL::Translator::Producer::${type}",
2364     %$sqltargs,
2365     parser => 'SQL::Translator::Parser::DBIx::Class',
2366     data => $schema,
2367   );
2368
2369   my $ret = $tr->translate
2370     or $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error);
2371
2372   return $ret;
2373 }
2374
2375 sub deploy {
2376   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2377   my $deploy = sub {
2378     my $line = shift;
2379     return if($line =~ /^--/);
2380     return if(!$line);
2381     # next if($line =~ /^DROP/m);
2382     return if($line =~ /^BEGIN TRANSACTION/m);
2383     return if($line =~ /^COMMIT/m);
2384     return if $line =~ /^\s+$/; # skip whitespace only
2385     $self->_query_start($line);
2386     eval {
2387       # do a dbh_do cycle here, as we need some error checking in
2388       # place (even though we will ignore errors)
2389       $self->dbh_do (sub { $_[1]->do($line) });
2390     };
2391     if ($@) {
2392       carp qq{$@ (running "${line}")};
2393     }
2394     $self->_query_end($line);
2395   };
2396   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2397   if (@statements > 1) {
2398     foreach my $statement (@statements) {
2399       $deploy->( $statement );
2400     }
2401   }
2402   elsif (@statements == 1) {
2403     foreach my $line ( split(";\n", $statements[0])) {
2404       $deploy->( $line );
2405     }
2406   }
2407 }
2408
2409 =head2 datetime_parser
2410
2411 Returns the datetime parser class
2412
2413 =cut
2414
2415 sub datetime_parser {
2416   my $self = shift;
2417   return $self->{datetime_parser} ||= do {
2418     $self->build_datetime_parser(@_);
2419   };
2420 }
2421
2422 =head2 datetime_parser_type
2423
2424 Defines (returns) the datetime parser class - currently hardwired to
2425 L<DateTime::Format::MySQL>
2426
2427 =cut
2428
2429 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2430
2431 =head2 build_datetime_parser
2432
2433 See L</datetime_parser>
2434
2435 =cut
2436
2437 sub build_datetime_parser {
2438   my $self = shift;
2439   my $type = $self->datetime_parser_type(@_);
2440   $self->ensure_class_loaded ($type);
2441   return $type;
2442 }
2443
2444
2445 =head2 is_replicating
2446
2447 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2448 replicate from a master database.  Default is undef, which is the result
2449 returned by databases that don't support replication.
2450
2451 =cut
2452
2453 sub is_replicating {
2454     return;
2455
2456 }
2457
2458 =head2 lag_behind_master
2459
2460 Returns a number that represents a certain amount of lag behind a master db
2461 when a given storage is replicating.  The number is database dependent, but
2462 starts at zero and increases with the amount of lag. Default in undef
2463
2464 =cut
2465
2466 sub lag_behind_master {
2467     return;
2468 }
2469
2470 # SQLT version handling
2471 {
2472   my $_sqlt_version_ok;     # private
2473   my $_sqlt_version_error;  # private
2474
2475   sub _sqlt_version_ok {
2476     if (!defined $_sqlt_version_ok) {
2477       eval "use SQL::Translator $minimum_sqlt_version";
2478       if ($@) {
2479         $_sqlt_version_ok = 0;
2480         $_sqlt_version_error = $@;
2481       }
2482       else {
2483         $_sqlt_version_ok = 1;
2484       }
2485     }
2486     return $_sqlt_version_ok;
2487   }
2488
2489   sub _sqlt_version_error {
2490     shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
2491     return $_sqlt_version_error;
2492   }
2493
2494   sub _sqlt_minimum_version { $minimum_sqlt_version };
2495 }
2496
2497 sub DESTROY {
2498   my $self = shift;
2499
2500   $self->_verify_pid if $self->_dbh;
2501
2502   # some databases need this to stop spewing warnings
2503   if (my $dbh = $self->_dbh) {
2504     local $@;
2505     eval { $dbh->disconnect };
2506   }
2507
2508   $self->_dbh(undef);
2509 }
2510
2511 1;
2512
2513 =head1 USAGE NOTES
2514
2515 =head2 DBIx::Class and AutoCommit
2516
2517 DBIx::Class can do some wonderful magic with handling exceptions,
2518 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2519 (the default) combined with C<txn_do> for transaction support.
2520
2521 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2522 in an assumed transaction between commits, and you're telling us you'd
2523 like to manage that manually.  A lot of the magic protections offered by
2524 this module will go away.  We can't protect you from exceptions due to database
2525 disconnects because we don't know anything about how to restart your
2526 transactions.  You're on your own for handling all sorts of exceptional
2527 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2528 be with raw DBI.
2529
2530
2531 =head1 AUTHORS
2532
2533 Matt S. Trout <mst@shadowcatsystems.co.uk>
2534
2535 Andy Grundman <andy@hybridized.org>
2536
2537 =head1 LICENSE
2538
2539 You may distribute this code under the same terms as Perl itself.
2540
2541 =cut