refactor part 2
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17
18 # what version of sqlt do we require if deploy() without a ddl_dir is invoked
19 # when changing also adjust the corresponding author_require in Makefile.PL
20 my $minimum_sqlt_version = '0.11002';
21
22
23 __PACKAGE__->mk_group_accessors('simple' =>
24   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
25      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
26 );
27
28 # the values for these accessors are picked out (and deleted) from
29 # the attribute hashref passed to connect_info
30 my @storage_options = qw/
31   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
32   disable_sth_caching unsafe auto_savepoint
33 /;
34 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
35
36
37 # default cursor class, overridable in connect_info attributes
38 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
39
40 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
41 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
42
43
44 # Each of these methods need _determine_driver called before itself
45 # in order to function reliably. This is a purely DRY optimization
46 my @rdbms_specific_methods = qw/
47   sqlt_type
48   build_datetime_parser
49   datetime_parser_type
50
51   insert
52   insert_bulk
53   update
54   delete
55   select
56   select_single
57 /;
58
59 for my $meth (@rdbms_specific_methods) {
60
61   my $orig = __PACKAGE__->can ($meth)
62     or next;
63
64   no strict qw/refs/;
65   no warnings qw/redefine/;
66   *{__PACKAGE__ ."::$meth"} = sub {
67     if (not $_[0]->_driver_determined) {
68       $_[0]->_determine_driver;
69       goto $_[0]->can($meth);
70     }
71     $orig->(@_);
72   };
73 }
74
75
76 =head1 NAME
77
78 DBIx::Class::Storage::DBI - DBI storage handler
79
80 =head1 SYNOPSIS
81
82   my $schema = MySchema->connect('dbi:SQLite:my.db');
83
84   $schema->storage->debug(1);
85
86   my @stuff = $schema->storage->dbh_do(
87     sub {
88       my ($storage, $dbh, @args) = @_;
89       $dbh->do("DROP TABLE authors");
90     },
91     @column_list
92   );
93
94   $schema->resultset('Book')->search({
95      written_on => $schema->storage->datetime_parser(DateTime->now)
96   });
97
98 =head1 DESCRIPTION
99
100 This class represents the connection to an RDBMS via L<DBI>.  See
101 L<DBIx::Class::Storage> for general information.  This pod only
102 documents DBI-specific methods and behaviors.
103
104 =head1 METHODS
105
106 =cut
107
108 sub new {
109   my $new = shift->next::method(@_);
110
111   $new->transaction_depth(0);
112   $new->_sql_maker_opts({});
113   $new->{savepoints} = [];
114   $new->{_in_dbh_do} = 0;
115   $new->{_dbh_gen} = 0;
116
117   $new;
118 }
119
120 =head2 connect_info
121
122 This method is normally called by L<DBIx::Class::Schema/connection>, which
123 encapsulates its argument list in an arrayref before passing them here.
124
125 The argument list may contain:
126
127 =over
128
129 =item *
130
131 The same 4-element argument set one would normally pass to
132 L<DBI/connect>, optionally followed by
133 L<extra attributes|/DBIx::Class specific connection attributes>
134 recognized by DBIx::Class:
135
136   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
137
138 =item *
139
140 A single code reference which returns a connected
141 L<DBI database handle|DBI/connect> optionally followed by
142 L<extra attributes|/DBIx::Class specific connection attributes> recognized
143 by DBIx::Class:
144
145   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
146
147 =item *
148
149 A single hashref with all the attributes and the dsn/user/password
150 mixed together:
151
152   $connect_info_args = [{
153     dsn => $dsn,
154     user => $user,
155     password => $pass,
156     %dbi_attributes,
157     %extra_attributes,
158   }];
159
160   $connect_info_args = [{
161     dbh_maker => sub { DBI->connect (...) },
162     %dbi_attributes,
163     %extra_attributes,
164   }];
165
166 This is particularly useful for L<Catalyst> based applications, allowing the
167 following config (L<Config::General> style):
168
169   <Model::DB>
170     schema_class   App::DB
171     <connect_info>
172       dsn          dbi:mysql:database=test
173       user         testuser
174       password     TestPass
175       AutoCommit   1
176     </connect_info>
177   </Model::DB>
178
179 The C<dsn>/C<user>/C<password> combination can be substituted by the
180 C<dbh_maker> key whose value is a coderef that returns a connected
181 L<DBI database handle|DBI/connect>
182
183 =back
184
185 Please note that the L<DBI> docs recommend that you always explicitly
186 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
187 recommends that it be set to I<1>, and that you perform transactions
188 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
189 to I<1> if you do not do explicitly set it to zero.  This is the default
190 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
191
192 =head3 DBIx::Class specific connection attributes
193
194 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
195 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
196 the following connection options. These options can be mixed in with your other
197 L<DBI> connection attributes, or placed in a seperate hashref
198 (C<\%extra_attributes>) as shown above.
199
200 Every time C<connect_info> is invoked, any previous settings for
201 these options will be cleared before setting the new ones, regardless of
202 whether any options are specified in the new C<connect_info>.
203
204
205 =over
206
207 =item on_connect_do
208
209 Specifies things to do immediately after connecting or re-connecting to
210 the database.  Its value may contain:
211
212 =over
213
214 =item a scalar
215
216 This contains one SQL statement to execute.
217
218 =item an array reference
219
220 This contains SQL statements to execute in order.  Each element contains
221 a string or a code reference that returns a string.
222
223 =item a code reference
224
225 This contains some code to execute.  Unlike code references within an
226 array reference, its return value is ignored.
227
228 =back
229
230 =item on_disconnect_do
231
232 Takes arguments in the same form as L</on_connect_do> and executes them
233 immediately before disconnecting from the database.
234
235 Note, this only runs if you explicitly call L</disconnect> on the
236 storage object.
237
238 =item on_connect_call
239
240 A more generalized form of L</on_connect_do> that calls the specified
241 C<connect_call_METHOD> methods in your storage driver.
242
243   on_connect_do => 'select 1'
244
245 is equivalent to:
246
247   on_connect_call => [ [ do_sql => 'select 1' ] ]
248
249 Its values may contain:
250
251 =over
252
253 =item a scalar
254
255 Will call the C<connect_call_METHOD> method.
256
257 =item a code reference
258
259 Will execute C<< $code->($storage) >>
260
261 =item an array reference
262
263 Each value can be a method name or code reference.
264
265 =item an array of arrays
266
267 For each array, the first item is taken to be the C<connect_call_> method name
268 or code reference, and the rest are parameters to it.
269
270 =back
271
272 Some predefined storage methods you may use:
273
274 =over
275
276 =item do_sql
277
278 Executes a SQL string or a code reference that returns a SQL string. This is
279 what L</on_connect_do> and L</on_disconnect_do> use.
280
281 It can take:
282
283 =over
284
285 =item a scalar
286
287 Will execute the scalar as SQL.
288
289 =item an arrayref
290
291 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
292 attributes hashref and bind values.
293
294 =item a code reference
295
296 Will execute C<< $code->($storage) >> and execute the return array refs as
297 above.
298
299 =back
300
301 =item datetime_setup
302
303 Execute any statements necessary to initialize the database session to return
304 and accept datetime/timestamp values used with
305 L<DBIx::Class::InflateColumn::DateTime>.
306
307 Only necessary for some databases, see your specific storage driver for
308 implementation details.
309
310 =back
311
312 =item on_disconnect_call
313
314 Takes arguments in the same form as L</on_connect_call> and executes them
315 immediately before disconnecting from the database.
316
317 Calls the C<disconnect_call_METHOD> methods as opposed to the
318 C<connect_call_METHOD> methods called by L</on_connect_call>.
319
320 Note, this only runs if you explicitly call L</disconnect> on the
321 storage object.
322
323 =item disable_sth_caching
324
325 If set to a true value, this option will disable the caching of
326 statement handles via L<DBI/prepare_cached>.
327
328 =item limit_dialect
329
330 Sets the limit dialect. This is useful for JDBC-bridge among others
331 where the remote SQL-dialect cannot be determined by the name of the
332 driver alone. See also L<SQL::Abstract::Limit>.
333
334 =item quote_char
335
336 Specifies what characters to use to quote table and column names. If
337 you use this you will want to specify L</name_sep> as well.
338
339 C<quote_char> expects either a single character, in which case is it
340 is placed on either side of the table/column name, or an arrayref of length
341 2 in which case the table/column name is placed between the elements.
342
343 For example under MySQL you should use C<< quote_char => '`' >>, and for
344 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
345
346 =item name_sep
347
348 This only needs to be used in conjunction with C<quote_char>, and is used to
349 specify the charecter that seperates elements (schemas, tables, columns) from
350 each other. In most cases this is simply a C<.>.
351
352 The consequences of not supplying this value is that L<SQL::Abstract>
353 will assume DBIx::Class' uses of aliases to be complete column
354 names. The output will look like I<"me.name"> when it should actually
355 be I<"me"."name">.
356
357 =item unsafe
358
359 This Storage driver normally installs its own C<HandleError>, sets
360 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
361 all database handles, including those supplied by a coderef.  It does this
362 so that it can have consistent and useful error behavior.
363
364 If you set this option to a true value, Storage will not do its usual
365 modifications to the database handle's attributes, and instead relies on
366 the settings in your connect_info DBI options (or the values you set in
367 your connection coderef, in the case that you are connecting via coderef).
368
369 Note that your custom settings can cause Storage to malfunction,
370 especially if you set a C<HandleError> handler that suppresses exceptions
371 and/or disable C<RaiseError>.
372
373 =item auto_savepoint
374
375 If this option is true, L<DBIx::Class> will use savepoints when nesting
376 transactions, making it possible to recover from failure in the inner
377 transaction without having to abort all outer transactions.
378
379 =item cursor_class
380
381 Use this argument to supply a cursor class other than the default
382 L<DBIx::Class::Storage::DBI::Cursor>.
383
384 =back
385
386 Some real-life examples of arguments to L</connect_info> and
387 L<DBIx::Class::Schema/connect>
388
389   # Simple SQLite connection
390   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
391
392   # Connect via subref
393   ->connect_info([ sub { DBI->connect(...) } ]);
394
395   # Connect via subref in hashref
396   ->connect_info([{
397     dbh_maker => sub { DBI->connect(...) },
398     on_connect_do => 'alter session ...',
399   }]);
400
401   # A bit more complicated
402   ->connect_info(
403     [
404       'dbi:Pg:dbname=foo',
405       'postgres',
406       'my_pg_password',
407       { AutoCommit => 1 },
408       { quote_char => q{"}, name_sep => q{.} },
409     ]
410   );
411
412   # Equivalent to the previous example
413   ->connect_info(
414     [
415       'dbi:Pg:dbname=foo',
416       'postgres',
417       'my_pg_password',
418       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
419     ]
420   );
421
422   # Same, but with hashref as argument
423   # See parse_connect_info for explanation
424   ->connect_info(
425     [{
426       dsn         => 'dbi:Pg:dbname=foo',
427       user        => 'postgres',
428       password    => 'my_pg_password',
429       AutoCommit  => 1,
430       quote_char  => q{"},
431       name_sep    => q{.},
432     }]
433   );
434
435   # Subref + DBIx::Class-specific connection options
436   ->connect_info(
437     [
438       sub { DBI->connect(...) },
439       {
440           quote_char => q{`},
441           name_sep => q{@},
442           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
443           disable_sth_caching => 1,
444       },
445     ]
446   );
447
448
449
450 =cut
451
452 sub connect_info {
453   my ($self, $info_arg) = @_;
454
455   return $self->_connect_info if !$info_arg;
456
457   my @args = @$info_arg;  # take a shallow copy for further mutilation
458   $self->_connect_info([@args]); # copy for _connect_info
459
460
461   # combine/pre-parse arguments depending on invocation style
462
463   my %attrs;
464   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
465     %attrs = %{ $args[1] || {} };
466     @args = $args[0];
467   }
468   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
469     %attrs = %{$args[0]};
470     @args = ();
471     if (my $code = delete $attrs{dbh_maker}) {
472       @args = $code;
473
474       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
475       if (@ignored) {
476         carp sprintf (
477             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
478           . "to the result of 'dbh_maker'",
479
480           join (', ', map { "'$_'" } (@ignored) ),
481         );
482       }
483     }
484     else {
485       @args = delete @attrs{qw/dsn user password/};
486     }
487   }
488   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
489     %attrs = (
490       % { $args[3] || {} },
491       % { $args[4] || {} },
492     );
493     @args = @args[0,1,2];
494   }
495
496   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
497   #  the new set of options
498   $self->_sql_maker(undef);
499   $self->_sql_maker_opts({});
500
501   if(keys %attrs) {
502     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
503       if(my $value = delete $attrs{$storage_opt}) {
504         $self->$storage_opt($value);
505       }
506     }
507     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
508       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
509         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
510       }
511     }
512   }
513
514   if (ref $args[0] eq 'CODE') {
515     # _connect() never looks past $args[0] in this case
516     %attrs = ()
517   } else {
518     %attrs = (
519       %{ $self->_default_dbi_connect_attributes || {} },
520       %attrs,
521     );
522   }
523
524   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
525   $self->_connect_info;
526 }
527
528 sub _default_dbi_connect_attributes {
529   return {
530     AutoCommit => 1,
531     RaiseError => 1,
532     PrintError => 0,
533   };
534 }
535
536 =head2 on_connect_do
537
538 This method is deprecated in favour of setting via L</connect_info>.
539
540 =cut
541
542 =head2 on_disconnect_do
543
544 This method is deprecated in favour of setting via L</connect_info>.
545
546 =cut
547
548 sub _parse_connect_do {
549   my ($self, $type) = @_;
550
551   my $val = $self->$type;
552   return () if not defined $val;
553
554   my @res;
555
556   if (not ref($val)) {
557     push @res, [ 'do_sql', $val ];
558   } elsif (ref($val) eq 'CODE') {
559     push @res, $val;
560   } elsif (ref($val) eq 'ARRAY') {
561     push @res, map { [ 'do_sql', $_ ] } @$val;
562   } else {
563     $self->throw_exception("Invalid type for $type: ".ref($val));
564   }
565
566   return \@res;
567 }
568
569 =head2 dbh_do
570
571 Arguments: ($subref | $method_name), @extra_coderef_args?
572
573 Execute the given $subref or $method_name using the new exception-based
574 connection management.
575
576 The first two arguments will be the storage object that C<dbh_do> was called
577 on and a database handle to use.  Any additional arguments will be passed
578 verbatim to the called subref as arguments 2 and onwards.
579
580 Using this (instead of $self->_dbh or $self->dbh) ensures correct
581 exception handling and reconnection (or failover in future subclasses).
582
583 Your subref should have no side-effects outside of the database, as
584 there is the potential for your subref to be partially double-executed
585 if the database connection was stale/dysfunctional.
586
587 Example:
588
589   my @stuff = $schema->storage->dbh_do(
590     sub {
591       my ($storage, $dbh, @cols) = @_;
592       my $cols = join(q{, }, @cols);
593       $dbh->selectrow_array("SELECT $cols FROM foo");
594     },
595     @column_list
596   );
597
598 =cut
599
600 sub dbh_do {
601   my $self = shift;
602   my $code = shift;
603
604   my $dbh = $self->_get_dbh;
605
606   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
607       || $self->{transaction_depth};
608
609   local $self->{_in_dbh_do} = 1;
610
611   my @result;
612   my $want_array = wantarray;
613
614   eval {
615
616     if($want_array) {
617         @result = $self->$code($dbh, @_);
618     }
619     elsif(defined $want_array) {
620         $result[0] = $self->$code($dbh, @_);
621     }
622     else {
623         $self->$code($dbh, @_);
624     }
625   };
626
627   # ->connected might unset $@ - copy
628   my $exception = $@;
629   if(!$exception) { return $want_array ? @result : $result[0] }
630
631   $self->throw_exception($exception) if $self->connected;
632
633   # We were not connected - reconnect and retry, but let any
634   #  exception fall right through this time
635   carp "Retrying $code after catching disconnected exception: $exception"
636     if $ENV{DBIC_DBIRETRY_DEBUG};
637   $self->_populate_dbh;
638   $self->$code($self->_dbh, @_);
639 }
640
641 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
642 # It also informs dbh_do to bypass itself while under the direction of txn_do,
643 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
644 sub txn_do {
645   my $self = shift;
646   my $coderef = shift;
647
648   ref $coderef eq 'CODE' or $self->throw_exception
649     ('$coderef must be a CODE reference');
650
651   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
652
653   local $self->{_in_dbh_do} = 1;
654
655   my @result;
656   my $want_array = wantarray;
657
658   my $tried = 0;
659   while(1) {
660     eval {
661       $self->_get_dbh;
662
663       $self->txn_begin;
664       if($want_array) {
665           @result = $coderef->(@_);
666       }
667       elsif(defined $want_array) {
668           $result[0] = $coderef->(@_);
669       }
670       else {
671           $coderef->(@_);
672       }
673       $self->txn_commit;
674     };
675
676     # ->connected might unset $@ - copy
677     my $exception = $@;
678     if(!$exception) { return $want_array ? @result : $result[0] }
679
680     if($tried++ || $self->connected) {
681       eval { $self->txn_rollback };
682       my $rollback_exception = $@;
683       if($rollback_exception) {
684         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
685         $self->throw_exception($exception)  # propagate nested rollback
686           if $rollback_exception =~ /$exception_class/;
687
688         $self->throw_exception(
689           "Transaction aborted: ${exception}. "
690           . "Rollback failed: ${rollback_exception}"
691         );
692       }
693       $self->throw_exception($exception)
694     }
695
696     # We were not connected, and was first try - reconnect and retry
697     # via the while loop
698     carp "Retrying $coderef after catching disconnected exception: $exception"
699       if $ENV{DBIC_DBIRETRY_DEBUG};
700     $self->_populate_dbh;
701   }
702 }
703
704 =head2 disconnect
705
706 Our C<disconnect> method also performs a rollback first if the
707 database is not in C<AutoCommit> mode.
708
709 =cut
710
711 sub disconnect {
712   my ($self) = @_;
713
714   if( $self->_dbh ) {
715     my @actions;
716
717     push @actions, ( $self->on_disconnect_call || () );
718     push @actions, $self->_parse_connect_do ('on_disconnect_do');
719
720     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
721
722     $self->_dbh_rollback unless $self->_dbh_autocommit;
723
724     $self->_dbh->disconnect;
725     $self->_dbh(undef);
726     $self->{_dbh_gen}++;
727   }
728 }
729
730 =head2 with_deferred_fk_checks
731
732 =over 4
733
734 =item Arguments: C<$coderef>
735
736 =item Return Value: The return value of $coderef
737
738 =back
739
740 Storage specific method to run the code ref with FK checks deferred or
741 in MySQL's case disabled entirely.
742
743 =cut
744
745 # Storage subclasses should override this
746 sub with_deferred_fk_checks {
747   my ($self, $sub) = @_;
748   $sub->();
749 }
750
751 =head2 connected
752
753 =over
754
755 =item Arguments: none
756
757 =item Return Value: 1|0
758
759 =back
760
761 Verifies that the the current database handle is active and ready to execute
762 an SQL statement (i.e. the connection did not get stale, server is still
763 answering, etc.) This method is used internally by L</dbh>.
764
765 =cut
766
767 sub connected {
768   my $self = shift;
769   return 0 unless $self->_seems_connected;
770
771   #be on the safe side
772   local $self->_dbh->{RaiseError} = 1;
773
774   return $self->_ping;
775 }
776
777 sub _seems_connected {
778   my $self = shift;
779
780   my $dbh = $self->_dbh
781     or return 0;
782
783   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
784     $self->_dbh(undef);
785     $self->{_dbh_gen}++;
786     return 0;
787   }
788   else {
789     $self->_verify_pid;
790     return 0 if !$self->_dbh;
791   }
792
793   return $dbh->FETCH('Active');
794 }
795
796 sub _ping {
797   my $self = shift;
798
799   my $dbh = $self->_dbh or return 0;
800
801   return $dbh->ping;
802 }
803
804 # handle pid changes correctly
805 #  NOTE: assumes $self->_dbh is a valid $dbh
806 sub _verify_pid {
807   my ($self) = @_;
808
809   return if defined $self->_conn_pid && $self->_conn_pid == $$;
810
811   $self->_dbh->{InactiveDestroy} = 1;
812   $self->_dbh(undef);
813   $self->{_dbh_gen}++;
814
815   return;
816 }
817
818 sub ensure_connected {
819   my ($self) = @_;
820
821   unless ($self->connected) {
822     $self->_populate_dbh;
823   }
824 }
825
826 =head2 dbh
827
828 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
829 is guaranteed to be healthy by implicitly calling L</connected>, and if
830 necessary performing a reconnection before returning. Keep in mind that this
831 is very B<expensive> on some database engines. Consider using L<dbh_do>
832 instead.
833
834 =cut
835
836 sub dbh {
837   my ($self) = @_;
838
839   if (not $self->_dbh) {
840     $self->_populate_dbh;
841   } else {
842     $self->ensure_connected;
843   }
844   return $self->_dbh;
845 }
846
847 # this is the internal "get dbh or connect (don't check)" method
848 sub _get_dbh {
849   my $self = shift;
850   $self->_verify_pid if $self->_dbh;
851   $self->_populate_dbh unless $self->_dbh;
852   return $self->_dbh;
853 }
854
855 sub _sql_maker_args {
856     my ($self) = @_;
857
858     return (
859       bindtype=>'columns',
860       array_datatypes => 1,
861       limit_dialect => $self->_get_dbh,
862       %{$self->_sql_maker_opts}
863     );
864 }
865
866 sub sql_maker {
867   my ($self) = @_;
868   unless ($self->_sql_maker) {
869     my $sql_maker_class = $self->sql_maker_class;
870     $self->ensure_class_loaded ($sql_maker_class);
871     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
872   }
873   return $self->_sql_maker;
874 }
875
876 # nothing to do by default
877 sub _rebless {}
878 sub _init {}
879
880 sub _populate_dbh {
881   my ($self) = @_;
882
883   my @info = @{$self->_dbi_connect_info || []};
884   $self->_dbh(undef); # in case ->connected failed we might get sent here
885   $self->_dbh($self->_connect(@info));
886
887   $self->_conn_pid($$);
888   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
889
890   $self->_determine_driver;
891
892   # Always set the transaction depth on connect, since
893   #  there is no transaction in progress by definition
894   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
895
896   $self->_run_connection_actions unless $self->{_in_determine_driver};
897 }
898
899 sub _run_connection_actions {
900   my $self = shift;
901   my @actions;
902
903   push @actions, ( $self->on_connect_call || () );
904   push @actions, $self->_parse_connect_do ('on_connect_do');
905
906   $self->_do_connection_actions(connect_call_ => $_) for @actions;
907 }
908
909 sub _determine_driver {
910   my ($self) = @_;
911
912   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
913     my $started_connected = 0;
914     local $self->{_in_determine_driver} = 1;
915
916     if (ref($self) eq __PACKAGE__) {
917       my $driver;
918       if ($self->_dbh) { # we are connected
919         $driver = $self->_dbh->{Driver}{Name};
920         $started_connected = 1;
921       } else {
922         # if connect_info is a CODEREF, we have no choice but to connect
923         if (ref $self->_dbi_connect_info->[0] &&
924             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
925           $self->_populate_dbh;
926           $driver = $self->_dbh->{Driver}{Name};
927         }
928         else {
929           # try to use dsn to not require being connected, the driver may still
930           # force a connection in _rebless to determine version
931           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
932         }
933       }
934
935       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
936       if ($self->load_optional_class($storage_class)) {
937         mro::set_mro($storage_class, 'c3');
938         bless $self, $storage_class;
939         $self->_rebless();
940       }
941     }
942
943     $self->_driver_determined(1);
944
945     $self->_init; # run driver-specific initializations
946
947     $self->_run_connection_actions
948         if !$started_connected && defined $self->_dbh;
949   }
950 }
951
952 sub _do_connection_actions {
953   my $self          = shift;
954   my $method_prefix = shift;
955   my $call          = shift;
956
957   if (not ref($call)) {
958     my $method = $method_prefix . $call;
959     $self->$method(@_);
960   } elsif (ref($call) eq 'CODE') {
961     $self->$call(@_);
962   } elsif (ref($call) eq 'ARRAY') {
963     if (ref($call->[0]) ne 'ARRAY') {
964       $self->_do_connection_actions($method_prefix, $_) for @$call;
965     } else {
966       $self->_do_connection_actions($method_prefix, @$_) for @$call;
967     }
968   } else {
969     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
970   }
971
972   return $self;
973 }
974
975 sub connect_call_do_sql {
976   my $self = shift;
977   $self->_do_query(@_);
978 }
979
980 sub disconnect_call_do_sql {
981   my $self = shift;
982   $self->_do_query(@_);
983 }
984
985 # override in db-specific backend when necessary
986 sub connect_call_datetime_setup { 1 }
987
988 sub _do_query {
989   my ($self, $action) = @_;
990
991   if (ref $action eq 'CODE') {
992     $action = $action->($self);
993     $self->_do_query($_) foreach @$action;
994   }
995   else {
996     # Most debuggers expect ($sql, @bind), so we need to exclude
997     # the attribute hash which is the second argument to $dbh->do
998     # furthermore the bind values are usually to be presented
999     # as named arrayref pairs, so wrap those here too
1000     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1001     my $sql = shift @do_args;
1002     my $attrs = shift @do_args;
1003     my @bind = map { [ undef, $_ ] } @do_args;
1004
1005     $self->_query_start($sql, @bind);
1006     $self->_get_dbh->do($sql, $attrs, @do_args);
1007     $self->_query_end($sql, @bind);
1008   }
1009
1010   return $self;
1011 }
1012
1013 sub _connect {
1014   my ($self, @info) = @_;
1015
1016   $self->throw_exception("You failed to provide any connection info")
1017     if !@info;
1018
1019   my ($old_connect_via, $dbh);
1020
1021   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1022     $old_connect_via = $DBI::connect_via;
1023     $DBI::connect_via = 'connect';
1024   }
1025
1026   eval {
1027     if(ref $info[0] eq 'CODE') {
1028        $dbh = &{$info[0]}
1029     }
1030     else {
1031        $dbh = DBI->connect(@info);
1032     }
1033
1034     if($dbh && !$self->unsafe) {
1035       my $weak_self = $self;
1036       Scalar::Util::weaken($weak_self);
1037       $dbh->{HandleError} = sub {
1038           if ($weak_self) {
1039             $weak_self->throw_exception("DBI Exception: $_[0]");
1040           }
1041           else {
1042             # the handler may be invoked by something totally out of
1043             # the scope of DBIC
1044             croak ("DBI Exception: $_[0]");
1045           }
1046       };
1047       $dbh->{ShowErrorStatement} = 1;
1048       $dbh->{RaiseError} = 1;
1049       $dbh->{PrintError} = 0;
1050     }
1051   };
1052
1053   $DBI::connect_via = $old_connect_via if $old_connect_via;
1054
1055   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1056     if !$dbh || $@;
1057
1058   $self->_dbh_autocommit($dbh->{AutoCommit});
1059
1060   $dbh;
1061 }
1062
1063 sub svp_begin {
1064   my ($self, $name) = @_;
1065
1066   $name = $self->_svp_generate_name
1067     unless defined $name;
1068
1069   $self->throw_exception ("You can't use savepoints outside a transaction")
1070     if $self->{transaction_depth} == 0;
1071
1072   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1073     unless $self->can('_svp_begin');
1074
1075   push @{ $self->{savepoints} }, $name;
1076
1077   $self->debugobj->svp_begin($name) if $self->debug;
1078
1079   return $self->_svp_begin($name);
1080 }
1081
1082 sub svp_release {
1083   my ($self, $name) = @_;
1084
1085   $self->throw_exception ("You can't use savepoints outside a transaction")
1086     if $self->{transaction_depth} == 0;
1087
1088   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1089     unless $self->can('_svp_release');
1090
1091   if (defined $name) {
1092     $self->throw_exception ("Savepoint '$name' does not exist")
1093       unless grep { $_ eq $name } @{ $self->{savepoints} };
1094
1095     # Dig through the stack until we find the one we are releasing.  This keeps
1096     # the stack up to date.
1097     my $svp;
1098
1099     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1100   } else {
1101     $name = pop @{ $self->{savepoints} };
1102   }
1103
1104   $self->debugobj->svp_release($name) if $self->debug;
1105
1106   return $self->_svp_release($name);
1107 }
1108
1109 sub svp_rollback {
1110   my ($self, $name) = @_;
1111
1112   $self->throw_exception ("You can't use savepoints outside a transaction")
1113     if $self->{transaction_depth} == 0;
1114
1115   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1116     unless $self->can('_svp_rollback');
1117
1118   if (defined $name) {
1119       # If they passed us a name, verify that it exists in the stack
1120       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1121           $self->throw_exception("Savepoint '$name' does not exist!");
1122       }
1123
1124       # Dig through the stack until we find the one we are releasing.  This keeps
1125       # the stack up to date.
1126       while(my $s = pop(@{ $self->{savepoints} })) {
1127           last if($s eq $name);
1128       }
1129       # Add the savepoint back to the stack, as a rollback doesn't remove the
1130       # named savepoint, only everything after it.
1131       push(@{ $self->{savepoints} }, $name);
1132   } else {
1133       # We'll assume they want to rollback to the last savepoint
1134       $name = $self->{savepoints}->[-1];
1135   }
1136
1137   $self->debugobj->svp_rollback($name) if $self->debug;
1138
1139   return $self->_svp_rollback($name);
1140 }
1141
1142 sub _svp_generate_name {
1143     my ($self) = @_;
1144
1145     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1146 }
1147
1148 sub txn_begin {
1149   my $self = shift;
1150   if($self->{transaction_depth} == 0) {
1151     $self->debugobj->txn_begin()
1152       if $self->debug;
1153     $self->_dbh_begin_work;
1154   }
1155   elsif ($self->auto_savepoint) {
1156     $self->svp_begin;
1157   }
1158   $self->{transaction_depth}++;
1159 }
1160
1161 sub _dbh_begin_work {
1162   my $self = shift;
1163
1164   # if the user is utilizing txn_do - good for him, otherwise we need to
1165   # ensure that the $dbh is healthy on BEGIN.
1166   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1167   # will be replaced by a failure of begin_work itself (which will be
1168   # then retried on reconnect)
1169   if ($self->{_in_dbh_do}) {
1170     $self->_dbh->begin_work;
1171   } else {
1172     $self->dbh_do(sub { $_[1]->begin_work });
1173   }
1174 }
1175
1176 sub txn_commit {
1177   my $self = shift;
1178   if ($self->{transaction_depth} == 1) {
1179     $self->debugobj->txn_commit()
1180       if ($self->debug);
1181     $self->_dbh_commit;
1182     $self->{transaction_depth} = 0
1183       if $self->_dbh_autocommit;
1184   }
1185   elsif($self->{transaction_depth} > 1) {
1186     $self->{transaction_depth}--;
1187     $self->svp_release
1188       if $self->auto_savepoint;
1189   }
1190 }
1191
1192 sub _dbh_commit {
1193   my $self = shift;
1194   my $dbh  = $self->_dbh
1195     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1196   $dbh->commit;
1197 }
1198
1199 sub txn_rollback {
1200   my $self = shift;
1201   my $dbh = $self->_dbh;
1202   eval {
1203     if ($self->{transaction_depth} == 1) {
1204       $self->debugobj->txn_rollback()
1205         if ($self->debug);
1206       $self->{transaction_depth} = 0
1207         if $self->_dbh_autocommit;
1208       $self->_dbh_rollback;
1209     }
1210     elsif($self->{transaction_depth} > 1) {
1211       $self->{transaction_depth}--;
1212       if ($self->auto_savepoint) {
1213         $self->svp_rollback;
1214         $self->svp_release;
1215       }
1216     }
1217     else {
1218       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1219     }
1220   };
1221   if ($@) {
1222     my $error = $@;
1223     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1224     $error =~ /$exception_class/ and $self->throw_exception($error);
1225     # ensure that a failed rollback resets the transaction depth
1226     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1227     $self->throw_exception($error);
1228   }
1229 }
1230
1231 sub _dbh_rollback {
1232   my $self = shift;
1233   my $dbh  = $self->_dbh
1234     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1235   $dbh->rollback;
1236 }
1237
1238 # This used to be the top-half of _execute.  It was split out to make it
1239 #  easier to override in NoBindVars without duping the rest.  It takes up
1240 #  all of _execute's args, and emits $sql, @bind.
1241 sub _prep_for_execute {
1242   my ($self, $op, $extra_bind, $ident, $args) = @_;
1243
1244   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1245     $ident = $ident->from();
1246   }
1247
1248   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1249
1250   unshift(@bind,
1251     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1252       if $extra_bind;
1253   return ($sql, \@bind);
1254 }
1255
1256
1257 sub _fix_bind_params {
1258     my ($self, @bind) = @_;
1259
1260     ### Turn @bind from something like this:
1261     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1262     ### to this:
1263     ###   ( "'1'", "'1'", "'3'" )
1264     return
1265         map {
1266             if ( defined( $_ && $_->[1] ) ) {
1267                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1268             }
1269             else { q{'NULL'}; }
1270         } @bind;
1271 }
1272
1273 sub _query_start {
1274     my ( $self, $sql, @bind ) = @_;
1275
1276     if ( $self->debug ) {
1277         @bind = $self->_fix_bind_params(@bind);
1278
1279         $self->debugobj->query_start( $sql, @bind );
1280     }
1281 }
1282
1283 sub _query_end {
1284     my ( $self, $sql, @bind ) = @_;
1285
1286     if ( $self->debug ) {
1287         @bind = $self->_fix_bind_params(@bind);
1288         $self->debugobj->query_end( $sql, @bind );
1289     }
1290 }
1291
1292 sub _dbh_execute {
1293   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1294
1295   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1296
1297   $self->_query_start( $sql, @$bind );
1298
1299   my $sth = $self->sth($sql,$op);
1300
1301   my $placeholder_index = 1;
1302
1303   foreach my $bound (@$bind) {
1304     my $attributes = {};
1305     my($column_name, @data) = @$bound;
1306
1307     if ($bind_attributes) {
1308       $attributes = $bind_attributes->{$column_name}
1309       if defined $bind_attributes->{$column_name};
1310     }
1311
1312     foreach my $data (@data) {
1313       my $ref = ref $data;
1314       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1315
1316       $sth->bind_param($placeholder_index, $data, $attributes);
1317       $placeholder_index++;
1318     }
1319   }
1320
1321   # Can this fail without throwing an exception anyways???
1322   my $rv = $sth->execute();
1323   $self->throw_exception($sth->errstr) if !$rv;
1324
1325   $self->_query_end( $sql, @$bind );
1326
1327   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1328 }
1329
1330 sub _execute {
1331     my $self = shift;
1332     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1333 }
1334
1335 sub insert {
1336   my ($self, $source, $to_insert) = @_;
1337
1338   my $ident = $source->from;
1339   my $bind_attributes = $self->source_bind_attributes($source);
1340
1341   my $updated_cols = {};
1342
1343   foreach my $col ( $source->columns ) {
1344     if ( !defined $to_insert->{$col} ) {
1345       my $col_info = $source->column_info($col);
1346
1347       if ( $col_info->{auto_nextval} ) {
1348         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1349           'nextval',
1350           $col_info->{sequence} ||
1351             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1352         );
1353       }
1354     }
1355   }
1356
1357   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1358
1359   return $updated_cols;
1360 }
1361
1362 ## Still not quite perfect, and EXPERIMENTAL
1363 ## Currently it is assumed that all values passed will be "normal", i.e. not
1364 ## scalar refs, or at least, all the same type as the first set, the statement is
1365 ## only prepped once.
1366 sub insert_bulk {
1367   my ($self, $source, $cols, $data) = @_;
1368
1369   my %colvalues;
1370   @colvalues{@$cols} = (0..$#$cols);
1371
1372   for my $i (0..$#$cols) {
1373     my $first_val = $data->[0][$i];
1374     next unless ref $first_val eq 'SCALAR';
1375
1376     $colvalues{ $cols->[$i] } = $first_val;
1377   }
1378
1379   # check for bad data and stringify stringifiable objects
1380   my $bad_slice = sub {
1381     my ($msg, $col_idx, $slice_idx) = @_;
1382     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1383       $msg,
1384       $cols->[$col_idx],
1385       do {
1386         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1387         Data::Dumper::Concise::Dumper({
1388           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1389         }),
1390       }
1391     );
1392   };
1393
1394   for my $datum_idx (0..$#$data) {
1395     my $datum = $data->[$datum_idx];
1396
1397     for my $col_idx (0..$#$cols) {
1398       my $val            = $datum->[$col_idx];
1399       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1400       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1401
1402       if ($is_literal_sql) {
1403         if (not ref $val) {
1404           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1405         }
1406         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1407           $bad_slice->("$reftype reference found where literal SQL expected",
1408             $col_idx, $datum_idx);
1409         }
1410         elsif ($$val ne $$sqla_bind){
1411           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1412             $col_idx, $datum_idx);
1413         }
1414       }
1415       elsif (my $reftype = ref $val) {
1416         require overload;
1417         if (overload::Method($val, '""')) {
1418           $datum->[$col_idx] = "".$val;
1419         }
1420         else {
1421           $bad_slice->("$reftype reference found where bind expected",
1422             $col_idx, $datum_idx);
1423         }
1424       }
1425     }
1426   }
1427
1428   my ($sql, $bind) = $self->_prep_for_execute (
1429     'insert', undef, $source, [\%colvalues]
1430   );
1431   my @bind = @$bind;
1432
1433   my $empty_bind = 1 if (not @bind) &&
1434     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1435
1436   if ((not @bind) && (not $empty_bind)) {
1437     $self->throw_exception(
1438       'Cannot insert_bulk without support for placeholders'
1439     );
1440   }
1441
1442   $self->_query_start( $sql, ['__BULK__'] );
1443   my $sth = $self->sth($sql);
1444
1445   my $rv = do {
1446     if ($empty_bind) {
1447       # bind_param_array doesn't work if there are no binds
1448       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1449     }
1450     else {
1451 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1452       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1453     }
1454   };
1455
1456   $self->_query_end( $sql, ['__BULK__'] );
1457
1458   return (wantarray ? ($rv, $sth, @bind) : $rv);
1459 }
1460
1461 sub _execute_array {
1462   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1463
1464   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1465
1466   ## This must be an arrayref, else nothing works!
1467   my $tuple_status = [];
1468
1469   ## Get the bind_attributes, if any exist
1470   my $bind_attributes = $self->source_bind_attributes($source);
1471
1472   ## Bind the values and execute
1473   my $placeholder_index = 1;
1474
1475   foreach my $bound (@$bind) {
1476
1477     my $attributes = {};
1478     my ($column_name, $data_index) = @$bound;
1479
1480     if( $bind_attributes ) {
1481       $attributes = $bind_attributes->{$column_name}
1482       if defined $bind_attributes->{$column_name};
1483     }
1484
1485     my @data = map { $_->[$data_index] } @$data;
1486
1487     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1488     $placeholder_index++;
1489   }
1490
1491   my $rv = eval {
1492     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1493   };
1494   my $err = $@ || $sth->errstr;
1495
1496 # Statement must finish even if there was an exception.
1497   eval { $sth->finish };
1498   $err = $@ unless $err;
1499
1500   if ($err) {
1501     my $i = 0;
1502     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1503
1504     $self->throw_exception("Unexpected populate error: $err")
1505       if ($i > $#$tuple_status);
1506
1507     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1508       ($tuple_status->[$i][1] || $err),
1509       Data::Dumper::Concise::Dumper({
1510         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1511       }),
1512     );
1513   }
1514
1515   $guard->commit if $guard;
1516
1517   return $rv;
1518 }
1519
1520 sub _dbh_execute_array {
1521     my ($self, $sth, $tuple_status, @extra) = @_;
1522
1523     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1524 }
1525
1526 sub _dbh_execute_inserts_with_no_binds {
1527   my ($self, $sth, $count) = @_;
1528
1529   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1530
1531   eval {
1532     my $dbh = $self->_get_dbh;
1533     local $dbh->{RaiseError} = 1;
1534     local $dbh->{PrintError} = 0;
1535
1536     $sth->execute foreach 1..$count;
1537   };
1538   my $exception = $@;
1539
1540 # Make sure statement is finished even if there was an exception.
1541   eval { $sth->finish };
1542   $exception = $@ unless $exception;
1543
1544   $self->throw_exception($exception) if $exception;
1545
1546   $guard->commit if $guard;
1547
1548   return $count;
1549 }
1550
1551 sub update {
1552   my ($self, $source, @args) = @_; 
1553
1554   my $bind_attrs = $self->source_bind_attributes($source);
1555
1556   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1557 }
1558
1559
1560 sub delete {
1561   my ($self, $source, @args) = @_;
1562
1563   my $bind_attrs = $self->source_bind_attributes($source);
1564
1565   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1566 }
1567
1568 # We were sent here because the $rs contains a complex search
1569 # which will require a subquery to select the correct rows
1570 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1571 #
1572 # Generating a single PK column subquery is trivial and supported
1573 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1574 # Look at _multipk_update_delete()
1575 sub _subq_update_delete {
1576   my $self = shift;
1577   my ($rs, $op, $values) = @_;
1578
1579   my $rsrc = $rs->result_source;
1580
1581   # quick check if we got a sane rs on our hands
1582   my @pcols = $rsrc->primary_columns;
1583
1584   my $sel = $rs->_resolved_attrs->{select};
1585   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1586
1587   if (
1588       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1589         ne
1590       join ("\x00", sort @$sel )
1591   ) {
1592     $self->throw_exception (
1593       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1594     );
1595   }
1596
1597   if (@pcols == 1) {
1598     return $self->$op (
1599       $rsrc,
1600       $op eq 'update' ? $values : (),
1601       { $pcols[0] => { -in => $rs->as_query } },
1602     );
1603   }
1604
1605   else {
1606     return $self->_multipk_update_delete (@_);
1607   }
1608 }
1609
1610 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1611 # resultset update/delete involving subqueries. So by default resort
1612 # to simple (and inefficient) delete_all style per-row opearations,
1613 # while allowing specific storages to override this with a faster
1614 # implementation.
1615 #
1616 sub _multipk_update_delete {
1617   return shift->_per_row_update_delete (@_);
1618 }
1619
1620 # This is the default loop used to delete/update rows for multi PK
1621 # resultsets, and used by mysql exclusively (because it can't do anything
1622 # else).
1623 #
1624 # We do not use $row->$op style queries, because resultset update/delete
1625 # is not expected to cascade (this is what delete_all/update_all is for).
1626 #
1627 # There should be no race conditions as the entire operation is rolled
1628 # in a transaction.
1629 #
1630 sub _per_row_update_delete {
1631   my $self = shift;
1632   my ($rs, $op, $values) = @_;
1633
1634   my $rsrc = $rs->result_source;
1635   my @pcols = $rsrc->primary_columns;
1636
1637   my $guard = $self->txn_scope_guard;
1638
1639   # emulate the return value of $sth->execute for non-selects
1640   my $row_cnt = '0E0';
1641
1642   my $subrs_cur = $rs->cursor;
1643   while (my @pks = $subrs_cur->next) {
1644
1645     my $cond;
1646     for my $i (0.. $#pcols) {
1647       $cond->{$pcols[$i]} = $pks[$i];
1648     }
1649
1650     $self->$op (
1651       $rsrc,
1652       $op eq 'update' ? $values : (),
1653       $cond,
1654     );
1655
1656     $row_cnt++;
1657   }
1658
1659   $guard->commit;
1660
1661   return $row_cnt;
1662 }
1663
1664 sub _select {
1665   my $self = shift;
1666
1667   # localization is neccessary as
1668   # 1) there is no infrastructure to pass this around before SQLA2
1669   # 2) _select_args sets it and _prep_for_execute consumes it
1670   my $sql_maker = $self->sql_maker;
1671   local $sql_maker->{_dbic_rs_attrs};
1672
1673   return $self->_execute($self->_select_args(@_));
1674 }
1675
1676 sub _select_args_to_query {
1677   my $self = shift;
1678
1679   # localization is neccessary as
1680   # 1) there is no infrastructure to pass this around before SQLA2
1681   # 2) _select_args sets it and _prep_for_execute consumes it
1682   my $sql_maker = $self->sql_maker;
1683   local $sql_maker->{_dbic_rs_attrs};
1684
1685   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1686   #  = $self->_select_args($ident, $select, $cond, $attrs);
1687   my ($op, $bind, $ident, $bind_attrs, @args) =
1688     $self->_select_args(@_);
1689
1690   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1691   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1692   $prepared_bind ||= [];
1693
1694   return wantarray
1695     ? ($sql, $prepared_bind, $bind_attrs)
1696     : \[ "($sql)", @$prepared_bind ]
1697   ;
1698 }
1699
1700 sub _select_args {
1701   my ($self, $ident, $select, $where, $attrs) = @_;
1702
1703   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1704
1705   my $sql_maker = $self->sql_maker;
1706   $sql_maker->{_dbic_rs_attrs} = {
1707     %$attrs,
1708     select => $select,
1709     from => $ident,
1710     where => $where,
1711     $rs_alias
1712       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1713       : ()
1714     ,
1715   };
1716
1717   # calculate bind_attrs before possible $ident mangling
1718   my $bind_attrs = {};
1719   for my $alias (keys %$alias2source) {
1720     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1721     for my $col (keys %$bindtypes) {
1722
1723       my $fqcn = join ('.', $alias, $col);
1724       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1725
1726       # Unqialified column names are nice, but at the same time can be
1727       # rather ambiguous. What we do here is basically go along with
1728       # the loop, adding an unqualified column slot to $bind_attrs,
1729       # alongside the fully qualified name. As soon as we encounter
1730       # another column by that name (which would imply another table)
1731       # we unset the unqualified slot and never add any info to it
1732       # to avoid erroneous type binding. If this happens the users
1733       # only choice will be to fully qualify his column name
1734
1735       if (exists $bind_attrs->{$col}) {
1736         $bind_attrs->{$col} = {};
1737       }
1738       else {
1739         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1740       }
1741     }
1742   }
1743
1744   # adjust limits
1745   if (
1746     $attrs->{software_limit}
1747       ||
1748     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1749   ) {
1750     $attrs->{software_limit} = 1;
1751   }
1752   else {
1753     $self->throw_exception("rows attribute must be positive if present")
1754       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1755
1756     # MySQL actually recommends this approach.  I cringe.
1757     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1758   }
1759
1760   my @limit;
1761
1762   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1763   # otherwise delegate the limiting to the storage, unless software limit was requested
1764   if (
1765     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1766        ||
1767     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1768       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1769   ) {
1770     ($ident, $select, $where, $attrs)
1771       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1772   }
1773   elsif (! $attrs->{software_limit} ) {
1774     push @limit, $attrs->{rows}, $attrs->{offset};
1775   }
1776
1777 ###
1778   # This would be the point to deflate anything found in $where
1779   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1780   # expect a row object. And all we have is a resultsource (it is trivial
1781   # to extract deflator coderefs via $alias2source above).
1782   #
1783   # I don't see a way forward other than changing the way deflators are
1784   # invoked, and that's just bad...
1785 ###
1786
1787   my $order = { map
1788     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1789     (qw/order_by group_by having/ )
1790   };
1791
1792   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1793 }
1794
1795 # Returns a counting SELECT for a simple count
1796 # query. Abstracted so that a storage could override
1797 # this to { count => 'firstcol' } or whatever makes
1798 # sense as a performance optimization
1799 sub _count_select {
1800   #my ($self, $source, $rs_attrs) = @_;
1801   return { count => '*' };
1802 }
1803
1804 # Returns a SELECT which will end up in the subselect
1805 # There may or may not be a group_by, as the subquery
1806 # might have been called to accomodate a limit
1807 #
1808 # Most databases would be happy with whatever ends up
1809 # here, but some choke in various ways.
1810 #
1811 sub _subq_count_select {
1812   my ($self, $source, $rs_attrs) = @_;
1813   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1814
1815   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1816   return @pcols ? \@pcols : [ 1 ];
1817 }
1818
1819 sub source_bind_attributes {
1820   my ($self, $source) = @_;
1821
1822   my $bind_attributes;
1823   foreach my $column ($source->columns) {
1824
1825     my $data_type = $source->column_info($column)->{data_type} || '';
1826     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1827      if $data_type;
1828   }
1829
1830   return $bind_attributes;
1831 }
1832
1833 =head2 select
1834
1835 =over 4
1836
1837 =item Arguments: $ident, $select, $condition, $attrs
1838
1839 =back
1840
1841 Handle a SQL select statement.
1842
1843 =cut
1844
1845 sub select {
1846   my $self = shift;
1847   my ($ident, $select, $condition, $attrs) = @_;
1848   return $self->cursor_class->new($self, \@_, $attrs);
1849 }
1850
1851 sub select_single {
1852   my $self = shift;
1853   my ($rv, $sth, @bind) = $self->_select(@_);
1854   my @row = $sth->fetchrow_array;
1855   my @nextrow = $sth->fetchrow_array if @row;
1856   if(@row && @nextrow) {
1857     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1858   }
1859   # Need to call finish() to work round broken DBDs
1860   $sth->finish();
1861   return @row;
1862 }
1863
1864 =head2 sth
1865
1866 =over 4
1867
1868 =item Arguments: $sql
1869
1870 =back
1871
1872 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1873
1874 =cut
1875
1876 sub _dbh_sth {
1877   my ($self, $dbh, $sql) = @_;
1878
1879   # 3 is the if_active parameter which avoids active sth re-use
1880   my $sth = $self->disable_sth_caching
1881     ? $dbh->prepare($sql)
1882     : $dbh->prepare_cached($sql, {}, 3);
1883
1884   # XXX You would think RaiseError would make this impossible,
1885   #  but apparently that's not true :(
1886   $self->throw_exception($dbh->errstr) if !$sth;
1887
1888   $sth;
1889 }
1890
1891 sub sth {
1892   my ($self, $sql) = @_;
1893   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
1894 }
1895
1896 sub _dbh_columns_info_for {
1897   my ($self, $dbh, $table) = @_;
1898
1899   if ($dbh->can('column_info')) {
1900     my %result;
1901     eval {
1902       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1903       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1904       $sth->execute();
1905       while ( my $info = $sth->fetchrow_hashref() ){
1906         my %column_info;
1907         $column_info{data_type}   = $info->{TYPE_NAME};
1908         $column_info{size}      = $info->{COLUMN_SIZE};
1909         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1910         $column_info{default_value} = $info->{COLUMN_DEF};
1911         my $col_name = $info->{COLUMN_NAME};
1912         $col_name =~ s/^\"(.*)\"$/$1/;
1913
1914         $result{$col_name} = \%column_info;
1915       }
1916     };
1917     return \%result if !$@ && scalar keys %result;
1918   }
1919
1920   my %result;
1921   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1922   $sth->execute;
1923   my @columns = @{$sth->{NAME_lc}};
1924   for my $i ( 0 .. $#columns ){
1925     my %column_info;
1926     $column_info{data_type} = $sth->{TYPE}->[$i];
1927     $column_info{size} = $sth->{PRECISION}->[$i];
1928     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1929
1930     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1931       $column_info{data_type} = $1;
1932       $column_info{size}    = $2;
1933     }
1934
1935     $result{$columns[$i]} = \%column_info;
1936   }
1937   $sth->finish;
1938
1939   foreach my $col (keys %result) {
1940     my $colinfo = $result{$col};
1941     my $type_num = $colinfo->{data_type};
1942     my $type_name;
1943     if(defined $type_num && $dbh->can('type_info')) {
1944       my $type_info = $dbh->type_info($type_num);
1945       $type_name = $type_info->{TYPE_NAME} if $type_info;
1946       $colinfo->{data_type} = $type_name if $type_name;
1947     }
1948   }
1949
1950   return \%result;
1951 }
1952
1953 sub columns_info_for {
1954   my ($self, $table) = @_;
1955   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
1956 }
1957
1958 =head2 last_insert_id
1959
1960 Return the row id of the last insert.
1961
1962 =cut
1963
1964 sub _dbh_last_insert_id {
1965     # All Storage's need to register their own _dbh_last_insert_id
1966     # the old SQLite-based method was highly inappropriate
1967
1968     my $self = shift;
1969     my $class = ref $self;
1970     $self->throw_exception (<<EOE);
1971
1972 No _dbh_last_insert_id() method found in $class.
1973 Since the method of obtaining the autoincrement id of the last insert
1974 operation varies greatly between different databases, this method must be
1975 individually implemented for every storage class.
1976 EOE
1977 }
1978
1979 sub last_insert_id {
1980   my $self = shift;
1981   $self->_dbh_last_insert_id ($self->_dbh, @_);
1982 }
1983
1984 =head2 _native_data_type
1985
1986 =over 4
1987
1988 =item Arguments: $type_name
1989
1990 =back
1991
1992 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
1993 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
1994 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
1995
1996 The default implementation returns C<undef>, implement in your Storage driver if
1997 you need this functionality.
1998
1999 Should map types from other databases to the native RDBMS type, for example
2000 C<VARCHAR2> to C<VARCHAR>.
2001
2002 Types with modifiers should map to the underlying data type. For example,
2003 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2004
2005 Composite types should map to the container type, for example
2006 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2007
2008 =cut
2009
2010 sub _native_data_type {
2011   #my ($self, $data_type) = @_;
2012   return undef
2013 }
2014
2015 # Check if placeholders are supported at all
2016 sub _placeholders_supported {
2017   my $self = shift;
2018   my $dbh  = $self->_get_dbh;
2019
2020   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2021   # but it is inaccurate more often than not
2022   eval {
2023     local $dbh->{PrintError} = 0;
2024     local $dbh->{RaiseError} = 1;
2025     $dbh->do('select ?', {}, 1);
2026   };
2027   return $@ ? 0 : 1;
2028 }
2029
2030 # Check if placeholders bound to non-string types throw exceptions
2031 #
2032 sub _typeless_placeholders_supported {
2033   my $self = shift;
2034   my $dbh  = $self->_get_dbh;
2035
2036   eval {
2037     local $dbh->{PrintError} = 0;
2038     local $dbh->{RaiseError} = 1;
2039     # this specifically tests a bind that is NOT a string
2040     $dbh->do('select 1 where 1 = ?', {}, 1);
2041   };
2042   return $@ ? 0 : 1;
2043 }
2044
2045 =head2 sqlt_type
2046
2047 Returns the database driver name.
2048
2049 =cut
2050
2051 sub sqlt_type {
2052   shift->_get_dbh->{Driver}->{Name};
2053 }
2054
2055 =head2 bind_attribute_by_data_type
2056
2057 Given a datatype from column info, returns a database specific bind
2058 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2059 let the database planner just handle it.
2060
2061 Generally only needed for special case column types, like bytea in postgres.
2062
2063 =cut
2064
2065 sub bind_attribute_by_data_type {
2066     return;
2067 }
2068
2069 =head2 is_datatype_numeric
2070
2071 Given a datatype from column_info, returns a boolean value indicating if
2072 the current RDBMS considers it a numeric value. This controls how
2073 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2074 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2075 be performed instead of the usual C<eq>.
2076
2077 =cut
2078
2079 sub is_datatype_numeric {
2080   my ($self, $dt) = @_;
2081
2082   return 0 unless $dt;
2083
2084   return $dt =~ /^ (?:
2085     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2086   ) $/ix;
2087 }
2088
2089
2090 =head2 create_ddl_dir (EXPERIMENTAL)
2091
2092 =over 4
2093
2094 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2095
2096 =back
2097
2098 Creates a SQL file based on the Schema, for each of the specified
2099 database engines in C<\@databases> in the given directory.
2100 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2101
2102 Given a previous version number, this will also create a file containing
2103 the ALTER TABLE statements to transform the previous schema into the
2104 current one. Note that these statements may contain C<DROP TABLE> or
2105 C<DROP COLUMN> statements that can potentially destroy data.
2106
2107 The file names are created using the C<ddl_filename> method below, please
2108 override this method in your schema if you would like a different file
2109 name format. For the ALTER file, the same format is used, replacing
2110 $version in the name with "$preversion-$version".
2111
2112 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2113 The most common value for this would be C<< { add_drop_table => 1 } >>
2114 to have the SQL produced include a C<DROP TABLE> statement for each table
2115 created. For quoting purposes supply C<quote_table_names> and
2116 C<quote_field_names>.
2117
2118 If no arguments are passed, then the following default values are assumed:
2119
2120 =over 4
2121
2122 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2123
2124 =item version    - $schema->schema_version
2125
2126 =item directory  - './'
2127
2128 =item preversion - <none>
2129
2130 =back
2131
2132 By default, C<\%sqlt_args> will have
2133
2134  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2135
2136 merged with the hash passed in. To disable any of those features, pass in a
2137 hashref like the following
2138
2139  { ignore_constraint_names => 0, # ... other options }
2140
2141
2142 Note that this feature is currently EXPERIMENTAL and may not work correctly
2143 across all databases, or fully handle complex relationships.
2144
2145 WARNING: Please check all SQL files created, before applying them.
2146
2147 =cut
2148
2149 sub create_ddl_dir {
2150   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2151
2152   if(!$dir || !-d $dir) {
2153     carp "No directory given, using ./\n";
2154     $dir = "./";
2155   }
2156   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2157   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2158
2159   my $schema_version = $schema->schema_version || '1.x';
2160   $version ||= $schema_version;
2161
2162   $sqltargs = {
2163     add_drop_table => 1,
2164     ignore_constraint_names => 1,
2165     ignore_index_names => 1,
2166     %{$sqltargs || {}}
2167   };
2168
2169   $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
2170     if !$self->_sqlt_version_ok;
2171
2172   my $sqlt = SQL::Translator->new( $sqltargs );
2173
2174   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2175   my $sqlt_schema = $sqlt->translate({ data => $schema })
2176     or $self->throw_exception ($sqlt->error);
2177
2178   foreach my $db (@$databases) {
2179     $sqlt->reset();
2180     $sqlt->{schema} = $sqlt_schema;
2181     $sqlt->producer($db);
2182
2183     my $file;
2184     my $filename = $schema->ddl_filename($db, $version, $dir);
2185     if (-e $filename && ($version eq $schema_version )) {
2186       # if we are dumping the current version, overwrite the DDL
2187       carp "Overwriting existing DDL file - $filename";
2188       unlink($filename);
2189     }
2190
2191     my $output = $sqlt->translate;
2192     if(!$output) {
2193       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2194       next;
2195     }
2196     if(!open($file, ">$filename")) {
2197       $self->throw_exception("Can't open $filename for writing ($!)");
2198       next;
2199     }
2200     print $file $output;
2201     close($file);
2202
2203     next unless ($preversion);
2204
2205     require SQL::Translator::Diff;
2206
2207     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2208     if(!-e $prefilename) {
2209       carp("No previous schema file found ($prefilename)");
2210       next;
2211     }
2212
2213     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2214     if(-e $difffile) {
2215       carp("Overwriting existing diff file - $difffile");
2216       unlink($difffile);
2217     }
2218
2219     my $source_schema;
2220     {
2221       my $t = SQL::Translator->new($sqltargs);
2222       $t->debug( 0 );
2223       $t->trace( 0 );
2224
2225       $t->parser( $db )
2226         or $self->throw_exception ($t->error);
2227
2228       my $out = $t->translate( $prefilename )
2229         or $self->throw_exception ($t->error);
2230
2231       $source_schema = $t->schema;
2232
2233       $source_schema->name( $prefilename )
2234         unless ( $source_schema->name );
2235     }
2236
2237     # The "new" style of producers have sane normalization and can support
2238     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2239     # And we have to diff parsed SQL against parsed SQL.
2240     my $dest_schema = $sqlt_schema;
2241
2242     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2243       my $t = SQL::Translator->new($sqltargs);
2244       $t->debug( 0 );
2245       $t->trace( 0 );
2246
2247       $t->parser( $db )
2248         or $self->throw_exception ($t->error);
2249
2250       my $out = $t->translate( $filename )
2251         or $self->throw_exception ($t->error);
2252
2253       $dest_schema = $t->schema;
2254
2255       $dest_schema->name( $filename )
2256         unless $dest_schema->name;
2257     }
2258
2259     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2260                                                   $dest_schema,   $db,
2261                                                   $sqltargs
2262                                                  );
2263     if(!open $file, ">$difffile") {
2264       $self->throw_exception("Can't write to $difffile ($!)");
2265       next;
2266     }
2267     print $file $diff;
2268     close($file);
2269   }
2270 }
2271
2272 =head2 deployment_statements
2273
2274 =over 4
2275
2276 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2277
2278 =back
2279
2280 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2281
2282 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2283 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2284
2285 C<$directory> is used to return statements from files in a previously created
2286 L</create_ddl_dir> directory and is optional. The filenames are constructed
2287 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2288
2289 If no C<$directory> is specified then the statements are constructed on the
2290 fly using L<SQL::Translator> and C<$version> is ignored.
2291
2292 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2293
2294 =cut
2295
2296 sub deployment_statements {
2297   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2298   $type ||= $self->sqlt_type;
2299   $version ||= $schema->schema_version || '1.x';
2300   $dir ||= './';
2301   my $filename = $schema->ddl_filename($type, $version, $dir);
2302   if(-f $filename)
2303   {
2304       my $file;
2305       open($file, "<$filename")
2306         or $self->throw_exception("Can't open $filename ($!)");
2307       my @rows = <$file>;
2308       close($file);
2309       return join('', @rows);
2310   }
2311
2312   $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
2313     if !$self->_sqlt_version_ok;
2314
2315   # sources needs to be a parser arg, but for simplicty allow at top level
2316   # coming in
2317   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2318       if exists $sqltargs->{sources};
2319
2320   my $tr = SQL::Translator->new(
2321     producer => "SQL::Translator::Producer::${type}",
2322     %$sqltargs,
2323     parser => 'SQL::Translator::Parser::DBIx::Class',
2324     data => $schema,
2325   );
2326
2327   my $ret = $tr->translate
2328     or $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error);
2329
2330   return $ret;
2331 }
2332
2333 sub deploy {
2334   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2335   my $deploy = sub {
2336     my $line = shift;
2337     return if($line =~ /^--/);
2338     return if(!$line);
2339     # next if($line =~ /^DROP/m);
2340     return if($line =~ /^BEGIN TRANSACTION/m);
2341     return if($line =~ /^COMMIT/m);
2342     return if $line =~ /^\s+$/; # skip whitespace only
2343     $self->_query_start($line);
2344     eval {
2345       # do a dbh_do cycle here, as we need some error checking in
2346       # place (even though we will ignore errors)
2347       $self->dbh_do (sub { $_[1]->do($line) });
2348     };
2349     if ($@) {
2350       carp qq{$@ (running "${line}")};
2351     }
2352     $self->_query_end($line);
2353   };
2354   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2355   if (@statements > 1) {
2356     foreach my $statement (@statements) {
2357       $deploy->( $statement );
2358     }
2359   }
2360   elsif (@statements == 1) {
2361     foreach my $line ( split(";\n", $statements[0])) {
2362       $deploy->( $line );
2363     }
2364   }
2365 }
2366
2367 =head2 datetime_parser
2368
2369 Returns the datetime parser class
2370
2371 =cut
2372
2373 sub datetime_parser {
2374   my $self = shift;
2375   return $self->{datetime_parser} ||= do {
2376     $self->build_datetime_parser(@_);
2377   };
2378 }
2379
2380 =head2 datetime_parser_type
2381
2382 Defines (returns) the datetime parser class - currently hardwired to
2383 L<DateTime::Format::MySQL>
2384
2385 =cut
2386
2387 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2388
2389 =head2 build_datetime_parser
2390
2391 See L</datetime_parser>
2392
2393 =cut
2394
2395 sub build_datetime_parser {
2396   my $self = shift;
2397   my $type = $self->datetime_parser_type(@_);
2398   $self->ensure_class_loaded ($type);
2399   return $type;
2400 }
2401
2402
2403 =head2 is_replicating
2404
2405 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2406 replicate from a master database.  Default is undef, which is the result
2407 returned by databases that don't support replication.
2408
2409 =cut
2410
2411 sub is_replicating {
2412     return;
2413
2414 }
2415
2416 =head2 lag_behind_master
2417
2418 Returns a number that represents a certain amount of lag behind a master db
2419 when a given storage is replicating.  The number is database dependent, but
2420 starts at zero and increases with the amount of lag. Default in undef
2421
2422 =cut
2423
2424 sub lag_behind_master {
2425     return;
2426 }
2427
2428 # SQLT version handling
2429 {
2430   my $_sqlt_version_ok;     # private
2431   my $_sqlt_version_error;  # private
2432
2433   sub _sqlt_version_ok {
2434     if (!defined $_sqlt_version_ok) {
2435       eval "use SQL::Translator $minimum_sqlt_version";
2436       if ($@) {
2437         $_sqlt_version_ok = 0;
2438         $_sqlt_version_error = $@;
2439       }
2440       else {
2441         $_sqlt_version_ok = 1;
2442       }
2443     }
2444     return $_sqlt_version_ok;
2445   }
2446
2447   sub _sqlt_version_error {
2448     shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
2449     return $_sqlt_version_error;
2450   }
2451
2452   sub _sqlt_minimum_version { $minimum_sqlt_version };
2453 }
2454
2455 sub DESTROY {
2456   my $self = shift;
2457
2458   $self->_verify_pid if $self->_dbh;
2459
2460   # some databases need this to stop spewing warnings
2461   if (my $dbh = $self->_dbh) {
2462     local $@;
2463     eval { $dbh->disconnect };
2464   }
2465
2466   $self->_dbh(undef);
2467 }
2468
2469 1;
2470
2471 =head1 USAGE NOTES
2472
2473 =head2 DBIx::Class and AutoCommit
2474
2475 DBIx::Class can do some wonderful magic with handling exceptions,
2476 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2477 (the default) combined with C<txn_do> for transaction support.
2478
2479 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2480 in an assumed transaction between commits, and you're telling us you'd
2481 like to manage that manually.  A lot of the magic protections offered by
2482 this module will go away.  We can't protect you from exceptions due to database
2483 disconnects because we don't know anything about how to restart your
2484 transactions.  You're on your own for handling all sorts of exceptional
2485 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2486 be with raw DBI.
2487
2488
2489 =head1 AUTHORS
2490
2491 Matt S. Trout <mst@shadowcatsystems.co.uk>
2492
2493 Andy Grundman <andy@hybridized.org>
2494
2495 =head1 LICENSE
2496
2497 You may distribute this code under the same terms as Perl itself.
2498
2499 =cut