Factor some code out
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBI::Hacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17
18 # what version of sqlt do we require if deploy() without a ddl_dir is invoked
19 # when changing also adjust the corresponding author_require in Makefile.PL
20 my $minimum_sqlt_version = '0.11002';
21
22
23 __PACKAGE__->mk_group_accessors('simple' =>
24   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
25      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
26 );
27
28 # the values for these accessors are picked out (and deleted) from
29 # the attribute hashref passed to connect_info
30 my @storage_options = qw/
31   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
32   disable_sth_caching unsafe auto_savepoint
33 /;
34 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
35
36
37 # default cursor class, overridable in connect_info attributes
38 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
39
40 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
41 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
42
43
44 # Each of these methods need _determine_driver called before itself
45 # in order to function reliably. This is a purely DRY optimization
46 my @rdbms_specific_methods = qw/
47   sqlt_type
48   build_datetime_parser
49   datetime_parser_type
50
51   insert
52   insert_bulk
53   update
54   delete
55   select
56   select_single
57 /;
58
59 for my $meth (@rdbms_specific_methods) {
60
61   my $orig = __PACKAGE__->can ($meth)
62     or next;
63
64   no strict qw/refs/;
65   no warnings qw/redefine/;
66   *{__PACKAGE__ ."::$meth"} = sub {
67     if (not $_[0]->_driver_determined) {
68       $_[0]->_determine_driver;
69       goto $_[0]->can($meth);
70     }
71     $orig->(@_);
72   };
73 }
74
75
76 =head1 NAME
77
78 DBIx::Class::Storage::DBI - DBI storage handler
79
80 =head1 SYNOPSIS
81
82   my $schema = MySchema->connect('dbi:SQLite:my.db');
83
84   $schema->storage->debug(1);
85
86   my @stuff = $schema->storage->dbh_do(
87     sub {
88       my ($storage, $dbh, @args) = @_;
89       $dbh->do("DROP TABLE authors");
90     },
91     @column_list
92   );
93
94   $schema->resultset('Book')->search({
95      written_on => $schema->storage->datetime_parser(DateTime->now)
96   });
97
98 =head1 DESCRIPTION
99
100 This class represents the connection to an RDBMS via L<DBI>.  See
101 L<DBIx::Class::Storage> for general information.  This pod only
102 documents DBI-specific methods and behaviors.
103
104 =head1 METHODS
105
106 =cut
107
108 sub new {
109   my $new = shift->next::method(@_);
110
111   $new->transaction_depth(0);
112   $new->_sql_maker_opts({});
113   $new->{savepoints} = [];
114   $new->{_in_dbh_do} = 0;
115   $new->{_dbh_gen} = 0;
116
117   $new;
118 }
119
120 =head2 connect_info
121
122 This method is normally called by L<DBIx::Class::Schema/connection>, which
123 encapsulates its argument list in an arrayref before passing them here.
124
125 The argument list may contain:
126
127 =over
128
129 =item *
130
131 The same 4-element argument set one would normally pass to
132 L<DBI/connect>, optionally followed by
133 L<extra attributes|/DBIx::Class specific connection attributes>
134 recognized by DBIx::Class:
135
136   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
137
138 =item *
139
140 A single code reference which returns a connected
141 L<DBI database handle|DBI/connect> optionally followed by
142 L<extra attributes|/DBIx::Class specific connection attributes> recognized
143 by DBIx::Class:
144
145   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
146
147 =item *
148
149 A single hashref with all the attributes and the dsn/user/password
150 mixed together:
151
152   $connect_info_args = [{
153     dsn => $dsn,
154     user => $user,
155     password => $pass,
156     %dbi_attributes,
157     %extra_attributes,
158   }];
159
160   $connect_info_args = [{
161     dbh_maker => sub { DBI->connect (...) },
162     %dbi_attributes,
163     %extra_attributes,
164   }];
165
166 This is particularly useful for L<Catalyst> based applications, allowing the
167 following config (L<Config::General> style):
168
169   <Model::DB>
170     schema_class   App::DB
171     <connect_info>
172       dsn          dbi:mysql:database=test
173       user         testuser
174       password     TestPass
175       AutoCommit   1
176     </connect_info>
177   </Model::DB>
178
179 The C<dsn>/C<user>/C<password> combination can be substituted by the
180 C<dbh_maker> key whose value is a coderef that returns a connected
181 L<DBI database handle|DBI/connect>
182
183 =back
184
185 Please note that the L<DBI> docs recommend that you always explicitly
186 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
187 recommends that it be set to I<1>, and that you perform transactions
188 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
189 to I<1> if you do not do explicitly set it to zero.  This is the default
190 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
191
192 =head3 DBIx::Class specific connection attributes
193
194 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
195 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
196 the following connection options. These options can be mixed in with your other
197 L<DBI> connection attributes, or placed in a seperate hashref
198 (C<\%extra_attributes>) as shown above.
199
200 Every time C<connect_info> is invoked, any previous settings for
201 these options will be cleared before setting the new ones, regardless of
202 whether any options are specified in the new C<connect_info>.
203
204
205 =over
206
207 =item on_connect_do
208
209 Specifies things to do immediately after connecting or re-connecting to
210 the database.  Its value may contain:
211
212 =over
213
214 =item a scalar
215
216 This contains one SQL statement to execute.
217
218 =item an array reference
219
220 This contains SQL statements to execute in order.  Each element contains
221 a string or a code reference that returns a string.
222
223 =item a code reference
224
225 This contains some code to execute.  Unlike code references within an
226 array reference, its return value is ignored.
227
228 =back
229
230 =item on_disconnect_do
231
232 Takes arguments in the same form as L</on_connect_do> and executes them
233 immediately before disconnecting from the database.
234
235 Note, this only runs if you explicitly call L</disconnect> on the
236 storage object.
237
238 =item on_connect_call
239
240 A more generalized form of L</on_connect_do> that calls the specified
241 C<connect_call_METHOD> methods in your storage driver.
242
243   on_connect_do => 'select 1'
244
245 is equivalent to:
246
247   on_connect_call => [ [ do_sql => 'select 1' ] ]
248
249 Its values may contain:
250
251 =over
252
253 =item a scalar
254
255 Will call the C<connect_call_METHOD> method.
256
257 =item a code reference
258
259 Will execute C<< $code->($storage) >>
260
261 =item an array reference
262
263 Each value can be a method name or code reference.
264
265 =item an array of arrays
266
267 For each array, the first item is taken to be the C<connect_call_> method name
268 or code reference, and the rest are parameters to it.
269
270 =back
271
272 Some predefined storage methods you may use:
273
274 =over
275
276 =item do_sql
277
278 Executes a SQL string or a code reference that returns a SQL string. This is
279 what L</on_connect_do> and L</on_disconnect_do> use.
280
281 It can take:
282
283 =over
284
285 =item a scalar
286
287 Will execute the scalar as SQL.
288
289 =item an arrayref
290
291 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
292 attributes hashref and bind values.
293
294 =item a code reference
295
296 Will execute C<< $code->($storage) >> and execute the return array refs as
297 above.
298
299 =back
300
301 =item datetime_setup
302
303 Execute any statements necessary to initialize the database session to return
304 and accept datetime/timestamp values used with
305 L<DBIx::Class::InflateColumn::DateTime>.
306
307 Only necessary for some databases, see your specific storage driver for
308 implementation details.
309
310 =back
311
312 =item on_disconnect_call
313
314 Takes arguments in the same form as L</on_connect_call> and executes them
315 immediately before disconnecting from the database.
316
317 Calls the C<disconnect_call_METHOD> methods as opposed to the
318 C<connect_call_METHOD> methods called by L</on_connect_call>.
319
320 Note, this only runs if you explicitly call L</disconnect> on the
321 storage object.
322
323 =item disable_sth_caching
324
325 If set to a true value, this option will disable the caching of
326 statement handles via L<DBI/prepare_cached>.
327
328 =item limit_dialect
329
330 Sets the limit dialect. This is useful for JDBC-bridge among others
331 where the remote SQL-dialect cannot be determined by the name of the
332 driver alone. See also L<SQL::Abstract::Limit>.
333
334 =item quote_char
335
336 Specifies what characters to use to quote table and column names. If
337 you use this you will want to specify L</name_sep> as well.
338
339 C<quote_char> expects either a single character, in which case is it
340 is placed on either side of the table/column name, or an arrayref of length
341 2 in which case the table/column name is placed between the elements.
342
343 For example under MySQL you should use C<< quote_char => '`' >>, and for
344 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
345
346 =item name_sep
347
348 This only needs to be used in conjunction with C<quote_char>, and is used to
349 specify the charecter that seperates elements (schemas, tables, columns) from
350 each other. In most cases this is simply a C<.>.
351
352 The consequences of not supplying this value is that L<SQL::Abstract>
353 will assume DBIx::Class' uses of aliases to be complete column
354 names. The output will look like I<"me.name"> when it should actually
355 be I<"me"."name">.
356
357 =item unsafe
358
359 This Storage driver normally installs its own C<HandleError>, sets
360 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
361 all database handles, including those supplied by a coderef.  It does this
362 so that it can have consistent and useful error behavior.
363
364 If you set this option to a true value, Storage will not do its usual
365 modifications to the database handle's attributes, and instead relies on
366 the settings in your connect_info DBI options (or the values you set in
367 your connection coderef, in the case that you are connecting via coderef).
368
369 Note that your custom settings can cause Storage to malfunction,
370 especially if you set a C<HandleError> handler that suppresses exceptions
371 and/or disable C<RaiseError>.
372
373 =item auto_savepoint
374
375 If this option is true, L<DBIx::Class> will use savepoints when nesting
376 transactions, making it possible to recover from failure in the inner
377 transaction without having to abort all outer transactions.
378
379 =item cursor_class
380
381 Use this argument to supply a cursor class other than the default
382 L<DBIx::Class::Storage::DBI::Cursor>.
383
384 =back
385
386 Some real-life examples of arguments to L</connect_info> and
387 L<DBIx::Class::Schema/connect>
388
389   # Simple SQLite connection
390   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
391
392   # Connect via subref
393   ->connect_info([ sub { DBI->connect(...) } ]);
394
395   # Connect via subref in hashref
396   ->connect_info([{
397     dbh_maker => sub { DBI->connect(...) },
398     on_connect_do => 'alter session ...',
399   }]);
400
401   # A bit more complicated
402   ->connect_info(
403     [
404       'dbi:Pg:dbname=foo',
405       'postgres',
406       'my_pg_password',
407       { AutoCommit => 1 },
408       { quote_char => q{"}, name_sep => q{.} },
409     ]
410   );
411
412   # Equivalent to the previous example
413   ->connect_info(
414     [
415       'dbi:Pg:dbname=foo',
416       'postgres',
417       'my_pg_password',
418       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
419     ]
420   );
421
422   # Same, but with hashref as argument
423   # See parse_connect_info for explanation
424   ->connect_info(
425     [{
426       dsn         => 'dbi:Pg:dbname=foo',
427       user        => 'postgres',
428       password    => 'my_pg_password',
429       AutoCommit  => 1,
430       quote_char  => q{"},
431       name_sep    => q{.},
432     }]
433   );
434
435   # Subref + DBIx::Class-specific connection options
436   ->connect_info(
437     [
438       sub { DBI->connect(...) },
439       {
440           quote_char => q{`},
441           name_sep => q{@},
442           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
443           disable_sth_caching => 1,
444       },
445     ]
446   );
447
448
449
450 =cut
451
452 sub connect_info {
453   my ($self, $info_arg) = @_;
454
455   return $self->_connect_info if !$info_arg;
456
457   my @args = @$info_arg;  # take a shallow copy for further mutilation
458   $self->_connect_info([@args]); # copy for _connect_info
459
460
461   # combine/pre-parse arguments depending on invocation style
462
463   my %attrs;
464   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
465     %attrs = %{ $args[1] || {} };
466     @args = $args[0];
467   }
468   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
469     %attrs = %{$args[0]};
470     @args = ();
471     if (my $code = delete $attrs{dbh_maker}) {
472       @args = $code;
473
474       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
475       if (@ignored) {
476         carp sprintf (
477             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
478           . "to the result of 'dbh_maker'",
479
480           join (', ', map { "'$_'" } (@ignored) ),
481         );
482       }
483     }
484     else {
485       @args = delete @attrs{qw/dsn user password/};
486     }
487   }
488   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
489     %attrs = (
490       % { $args[3] || {} },
491       % { $args[4] || {} },
492     );
493     @args = @args[0,1,2];
494   }
495
496   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
497   #  the new set of options
498   $self->_sql_maker(undef);
499   $self->_sql_maker_opts({});
500
501   if(keys %attrs) {
502     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
503       if(my $value = delete $attrs{$storage_opt}) {
504         $self->$storage_opt($value);
505       }
506     }
507     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
508       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
509         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
510       }
511     }
512   }
513
514   if (ref $args[0] eq 'CODE') {
515     # _connect() never looks past $args[0] in this case
516     %attrs = ()
517   } else {
518     %attrs = (
519       %{ $self->_default_dbi_connect_attributes || {} },
520       %attrs,
521     );
522   }
523
524   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
525   $self->_connect_info;
526 }
527
528 sub _default_dbi_connect_attributes {
529   return {
530     AutoCommit => 1,
531     RaiseError => 1,
532     PrintError => 0,
533   };
534 }
535
536 =head2 on_connect_do
537
538 This method is deprecated in favour of setting via L</connect_info>.
539
540 =cut
541
542 =head2 on_disconnect_do
543
544 This method is deprecated in favour of setting via L</connect_info>.
545
546 =cut
547
548 sub _parse_connect_do {
549   my ($self, $type) = @_;
550
551   my $val = $self->$type;
552   return () if not defined $val;
553
554   my @res;
555
556   if (not ref($val)) {
557     push @res, [ 'do_sql', $val ];
558   } elsif (ref($val) eq 'CODE') {
559     push @res, $val;
560   } elsif (ref($val) eq 'ARRAY') {
561     push @res, map { [ 'do_sql', $_ ] } @$val;
562   } else {
563     $self->throw_exception("Invalid type for $type: ".ref($val));
564   }
565
566   return \@res;
567 }
568
569 =head2 dbh_do
570
571 Arguments: ($subref | $method_name), @extra_coderef_args?
572
573 Execute the given $subref or $method_name using the new exception-based
574 connection management.
575
576 The first two arguments will be the storage object that C<dbh_do> was called
577 on and a database handle to use.  Any additional arguments will be passed
578 verbatim to the called subref as arguments 2 and onwards.
579
580 Using this (instead of $self->_dbh or $self->dbh) ensures correct
581 exception handling and reconnection (or failover in future subclasses).
582
583 Your subref should have no side-effects outside of the database, as
584 there is the potential for your subref to be partially double-executed
585 if the database connection was stale/dysfunctional.
586
587 Example:
588
589   my @stuff = $schema->storage->dbh_do(
590     sub {
591       my ($storage, $dbh, @cols) = @_;
592       my $cols = join(q{, }, @cols);
593       $dbh->selectrow_array("SELECT $cols FROM foo");
594     },
595     @column_list
596   );
597
598 =cut
599
600 sub dbh_do {
601   my $self = shift;
602   my $code = shift;
603
604   my $dbh = $self->_get_dbh;
605
606   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
607       || $self->{transaction_depth};
608
609   local $self->{_in_dbh_do} = 1;
610
611   my @result;
612   my $want_array = wantarray;
613
614   eval {
615
616     if($want_array) {
617         @result = $self->$code($dbh, @_);
618     }
619     elsif(defined $want_array) {
620         $result[0] = $self->$code($dbh, @_);
621     }
622     else {
623         $self->$code($dbh, @_);
624     }
625   };
626
627   # ->connected might unset $@ - copy
628   my $exception = $@;
629   if(!$exception) { return $want_array ? @result : $result[0] }
630
631   $self->throw_exception($exception) if $self->connected;
632
633   # We were not connected - reconnect and retry, but let any
634   #  exception fall right through this time
635   carp "Retrying $code after catching disconnected exception: $exception"
636     if $ENV{DBIC_DBIRETRY_DEBUG};
637   $self->_populate_dbh;
638   $self->$code($self->_dbh, @_);
639 }
640
641 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
642 # It also informs dbh_do to bypass itself while under the direction of txn_do,
643 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
644 sub txn_do {
645   my $self = shift;
646   my $coderef = shift;
647
648   ref $coderef eq 'CODE' or $self->throw_exception
649     ('$coderef must be a CODE reference');
650
651   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
652
653   local $self->{_in_dbh_do} = 1;
654
655   my @result;
656   my $want_array = wantarray;
657
658   my $tried = 0;
659   while(1) {
660     eval {
661       $self->_get_dbh;
662
663       $self->txn_begin;
664       if($want_array) {
665           @result = $coderef->(@_);
666       }
667       elsif(defined $want_array) {
668           $result[0] = $coderef->(@_);
669       }
670       else {
671           $coderef->(@_);
672       }
673       $self->txn_commit;
674     };
675
676     # ->connected might unset $@ - copy
677     my $exception = $@;
678     if(!$exception) { return $want_array ? @result : $result[0] }
679
680     if($tried++ || $self->connected) {
681       eval { $self->txn_rollback };
682       my $rollback_exception = $@;
683       if($rollback_exception) {
684         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
685         $self->throw_exception($exception)  # propagate nested rollback
686           if $rollback_exception =~ /$exception_class/;
687
688         $self->throw_exception(
689           "Transaction aborted: ${exception}. "
690           . "Rollback failed: ${rollback_exception}"
691         );
692       }
693       $self->throw_exception($exception)
694     }
695
696     # We were not connected, and was first try - reconnect and retry
697     # via the while loop
698     carp "Retrying $coderef after catching disconnected exception: $exception"
699       if $ENV{DBIC_DBIRETRY_DEBUG};
700     $self->_populate_dbh;
701   }
702 }
703
704 =head2 disconnect
705
706 Our C<disconnect> method also performs a rollback first if the
707 database is not in C<AutoCommit> mode.
708
709 =cut
710
711 sub disconnect {
712   my ($self) = @_;
713
714   if( $self->_dbh ) {
715     my @actions;
716
717     push @actions, ( $self->on_disconnect_call || () );
718     push @actions, $self->_parse_connect_do ('on_disconnect_do');
719
720     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
721
722     $self->_dbh_rollback unless $self->_dbh_autocommit;
723
724     $self->_dbh->disconnect;
725     $self->_dbh(undef);
726     $self->{_dbh_gen}++;
727   }
728 }
729
730 =head2 with_deferred_fk_checks
731
732 =over 4
733
734 =item Arguments: C<$coderef>
735
736 =item Return Value: The return value of $coderef
737
738 =back
739
740 Storage specific method to run the code ref with FK checks deferred or
741 in MySQL's case disabled entirely.
742
743 =cut
744
745 # Storage subclasses should override this
746 sub with_deferred_fk_checks {
747   my ($self, $sub) = @_;
748   $sub->();
749 }
750
751 =head2 connected
752
753 =over
754
755 =item Arguments: none
756
757 =item Return Value: 1|0
758
759 =back
760
761 Verifies that the the current database handle is active and ready to execute
762 an SQL statement (i.e. the connection did not get stale, server is still
763 answering, etc.) This method is used internally by L</dbh>.
764
765 =cut
766
767 sub connected {
768   my $self = shift;
769   return 0 unless $self->_seems_connected;
770
771   #be on the safe side
772   local $self->_dbh->{RaiseError} = 1;
773
774   return $self->_ping;
775 }
776
777 sub _seems_connected {
778   my $self = shift;
779
780   my $dbh = $self->_dbh
781     or return 0;
782
783   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
784     $self->_dbh(undef);
785     $self->{_dbh_gen}++;
786     return 0;
787   }
788   else {
789     $self->_verify_pid;
790     return 0 if !$self->_dbh;
791   }
792
793   return $dbh->FETCH('Active');
794 }
795
796 sub _ping {
797   my $self = shift;
798
799   my $dbh = $self->_dbh or return 0;
800
801   return $dbh->ping;
802 }
803
804 # handle pid changes correctly
805 #  NOTE: assumes $self->_dbh is a valid $dbh
806 sub _verify_pid {
807   my ($self) = @_;
808
809   return if defined $self->_conn_pid && $self->_conn_pid == $$;
810
811   $self->_dbh->{InactiveDestroy} = 1;
812   $self->_dbh(undef);
813   $self->{_dbh_gen}++;
814
815   return;
816 }
817
818 sub ensure_connected {
819   my ($self) = @_;
820
821   unless ($self->connected) {
822     $self->_populate_dbh;
823   }
824 }
825
826 =head2 dbh
827
828 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
829 is guaranteed to be healthy by implicitly calling L</connected>, and if
830 necessary performing a reconnection before returning. Keep in mind that this
831 is very B<expensive> on some database engines. Consider using L<dbh_do>
832 instead.
833
834 =cut
835
836 sub dbh {
837   my ($self) = @_;
838
839   if (not $self->_dbh) {
840     $self->_populate_dbh;
841   } else {
842     $self->ensure_connected;
843   }
844   return $self->_dbh;
845 }
846
847 # this is the internal "get dbh or connect (don't check)" method
848 sub _get_dbh {
849   my $self = shift;
850   $self->_verify_pid if $self->_dbh;
851   $self->_populate_dbh unless $self->_dbh;
852   return $self->_dbh;
853 }
854
855 sub _sql_maker_args {
856     my ($self) = @_;
857
858     return (
859       bindtype=>'columns',
860       array_datatypes => 1,
861       limit_dialect => $self->_get_dbh,
862       %{$self->_sql_maker_opts}
863     );
864 }
865
866 sub sql_maker {
867   my ($self) = @_;
868   unless ($self->_sql_maker) {
869     my $sql_maker_class = $self->sql_maker_class;
870     $self->ensure_class_loaded ($sql_maker_class);
871     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
872   }
873   return $self->_sql_maker;
874 }
875
876 # nothing to do by default
877 sub _rebless {}
878 sub _init {}
879
880 sub _populate_dbh {
881   my ($self) = @_;
882
883   my @info = @{$self->_dbi_connect_info || []};
884   $self->_dbh(undef); # in case ->connected failed we might get sent here
885   $self->_dbh($self->_connect(@info));
886
887   $self->_conn_pid($$);
888   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
889
890   $self->_determine_driver;
891
892   # Always set the transaction depth on connect, since
893   #  there is no transaction in progress by definition
894   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
895
896   $self->_run_connection_actions unless $self->{_in_determine_driver};
897 }
898
899 sub _run_connection_actions {
900   my $self = shift;
901   my @actions;
902
903   push @actions, ( $self->on_connect_call || () );
904   push @actions, $self->_parse_connect_do ('on_connect_do');
905
906   $self->_do_connection_actions(connect_call_ => $_) for @actions;
907 }
908
909 sub _determine_driver {
910   my ($self) = @_;
911
912   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
913     my $started_connected = 0;
914     local $self->{_in_determine_driver} = 1;
915
916     if (ref($self) eq __PACKAGE__) {
917       my $driver;
918       if ($self->_dbh) { # we are connected
919         $driver = $self->_dbh->{Driver}{Name};
920         $started_connected = 1;
921       } else {
922         # if connect_info is a CODEREF, we have no choice but to connect
923         if (ref $self->_dbi_connect_info->[0] &&
924             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
925           $self->_populate_dbh;
926           $driver = $self->_dbh->{Driver}{Name};
927         }
928         else {
929           # try to use dsn to not require being connected, the driver may still
930           # force a connection in _rebless to determine version
931           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
932         }
933       }
934
935       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
936       if ($self->load_optional_class($storage_class)) {
937         mro::set_mro($storage_class, 'c3');
938         bless $self, $storage_class;
939         $self->_rebless();
940       }
941     }
942
943     $self->_driver_determined(1);
944
945     $self->_init; # run driver-specific initializations
946
947     $self->_run_connection_actions
948         if !$started_connected && defined $self->_dbh;
949   }
950 }
951
952 sub _do_connection_actions {
953   my $self          = shift;
954   my $method_prefix = shift;
955   my $call          = shift;
956
957   if (not ref($call)) {
958     my $method = $method_prefix . $call;
959     $self->$method(@_);
960   } elsif (ref($call) eq 'CODE') {
961     $self->$call(@_);
962   } elsif (ref($call) eq 'ARRAY') {
963     if (ref($call->[0]) ne 'ARRAY') {
964       $self->_do_connection_actions($method_prefix, $_) for @$call;
965     } else {
966       $self->_do_connection_actions($method_prefix, @$_) for @$call;
967     }
968   } else {
969     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
970   }
971
972   return $self;
973 }
974
975 sub connect_call_do_sql {
976   my $self = shift;
977   $self->_do_query(@_);
978 }
979
980 sub disconnect_call_do_sql {
981   my $self = shift;
982   $self->_do_query(@_);
983 }
984
985 # override in db-specific backend when necessary
986 sub connect_call_datetime_setup { 1 }
987
988 sub _do_query {
989   my ($self, $action) = @_;
990
991   if (ref $action eq 'CODE') {
992     $action = $action->($self);
993     $self->_do_query($_) foreach @$action;
994   }
995   else {
996     # Most debuggers expect ($sql, @bind), so we need to exclude
997     # the attribute hash which is the second argument to $dbh->do
998     # furthermore the bind values are usually to be presented
999     # as named arrayref pairs, so wrap those here too
1000     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1001     my $sql = shift @do_args;
1002     my $attrs = shift @do_args;
1003     my @bind = map { [ undef, $_ ] } @do_args;
1004
1005     $self->_query_start($sql, @bind);
1006     $self->_get_dbh->do($sql, $attrs, @do_args);
1007     $self->_query_end($sql, @bind);
1008   }
1009
1010   return $self;
1011 }
1012
1013 sub _connect {
1014   my ($self, @info) = @_;
1015
1016   $self->throw_exception("You failed to provide any connection info")
1017     if !@info;
1018
1019   my ($old_connect_via, $dbh);
1020
1021   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1022     $old_connect_via = $DBI::connect_via;
1023     $DBI::connect_via = 'connect';
1024   }
1025
1026   eval {
1027     if(ref $info[0] eq 'CODE') {
1028        $dbh = &{$info[0]}
1029     }
1030     else {
1031        $dbh = DBI->connect(@info);
1032     }
1033
1034     if($dbh && !$self->unsafe) {
1035       my $weak_self = $self;
1036       Scalar::Util::weaken($weak_self);
1037       $dbh->{HandleError} = sub {
1038           if ($weak_self) {
1039             $weak_self->throw_exception("DBI Exception: $_[0]");
1040           }
1041           else {
1042             # the handler may be invoked by something totally out of
1043             # the scope of DBIC
1044             croak ("DBI Exception: $_[0]");
1045           }
1046       };
1047       $dbh->{ShowErrorStatement} = 1;
1048       $dbh->{RaiseError} = 1;
1049       $dbh->{PrintError} = 0;
1050     }
1051   };
1052
1053   $DBI::connect_via = $old_connect_via if $old_connect_via;
1054
1055   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1056     if !$dbh || $@;
1057
1058   $self->_dbh_autocommit($dbh->{AutoCommit});
1059
1060   $dbh;
1061 }
1062
1063 sub svp_begin {
1064   my ($self, $name) = @_;
1065
1066   $name = $self->_svp_generate_name
1067     unless defined $name;
1068
1069   $self->throw_exception ("You can't use savepoints outside a transaction")
1070     if $self->{transaction_depth} == 0;
1071
1072   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1073     unless $self->can('_svp_begin');
1074
1075   push @{ $self->{savepoints} }, $name;
1076
1077   $self->debugobj->svp_begin($name) if $self->debug;
1078
1079   return $self->_svp_begin($name);
1080 }
1081
1082 sub svp_release {
1083   my ($self, $name) = @_;
1084
1085   $self->throw_exception ("You can't use savepoints outside a transaction")
1086     if $self->{transaction_depth} == 0;
1087
1088   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1089     unless $self->can('_svp_release');
1090
1091   if (defined $name) {
1092     $self->throw_exception ("Savepoint '$name' does not exist")
1093       unless grep { $_ eq $name } @{ $self->{savepoints} };
1094
1095     # Dig through the stack until we find the one we are releasing.  This keeps
1096     # the stack up to date.
1097     my $svp;
1098
1099     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1100   } else {
1101     $name = pop @{ $self->{savepoints} };
1102   }
1103
1104   $self->debugobj->svp_release($name) if $self->debug;
1105
1106   return $self->_svp_release($name);
1107 }
1108
1109 sub svp_rollback {
1110   my ($self, $name) = @_;
1111
1112   $self->throw_exception ("You can't use savepoints outside a transaction")
1113     if $self->{transaction_depth} == 0;
1114
1115   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1116     unless $self->can('_svp_rollback');
1117
1118   if (defined $name) {
1119       # If they passed us a name, verify that it exists in the stack
1120       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1121           $self->throw_exception("Savepoint '$name' does not exist!");
1122       }
1123
1124       # Dig through the stack until we find the one we are releasing.  This keeps
1125       # the stack up to date.
1126       while(my $s = pop(@{ $self->{savepoints} })) {
1127           last if($s eq $name);
1128       }
1129       # Add the savepoint back to the stack, as a rollback doesn't remove the
1130       # named savepoint, only everything after it.
1131       push(@{ $self->{savepoints} }, $name);
1132   } else {
1133       # We'll assume they want to rollback to the last savepoint
1134       $name = $self->{savepoints}->[-1];
1135   }
1136
1137   $self->debugobj->svp_rollback($name) if $self->debug;
1138
1139   return $self->_svp_rollback($name);
1140 }
1141
1142 sub _svp_generate_name {
1143     my ($self) = @_;
1144
1145     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1146 }
1147
1148 sub txn_begin {
1149   my $self = shift;
1150   if($self->{transaction_depth} == 0) {
1151     $self->debugobj->txn_begin()
1152       if $self->debug;
1153     $self->_dbh_begin_work;
1154   }
1155   elsif ($self->auto_savepoint) {
1156     $self->svp_begin;
1157   }
1158   $self->{transaction_depth}++;
1159 }
1160
1161 sub _dbh_begin_work {
1162   my $self = shift;
1163
1164   # if the user is utilizing txn_do - good for him, otherwise we need to
1165   # ensure that the $dbh is healthy on BEGIN.
1166   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1167   # will be replaced by a failure of begin_work itself (which will be
1168   # then retried on reconnect)
1169   if ($self->{_in_dbh_do}) {
1170     $self->_dbh->begin_work;
1171   } else {
1172     $self->dbh_do(sub { $_[1]->begin_work });
1173   }
1174 }
1175
1176 sub txn_commit {
1177   my $self = shift;
1178   if ($self->{transaction_depth} == 1) {
1179     $self->debugobj->txn_commit()
1180       if ($self->debug);
1181     $self->_dbh_commit;
1182     $self->{transaction_depth} = 0
1183       if $self->_dbh_autocommit;
1184   }
1185   elsif($self->{transaction_depth} > 1) {
1186     $self->{transaction_depth}--;
1187     $self->svp_release
1188       if $self->auto_savepoint;
1189   }
1190 }
1191
1192 sub _dbh_commit {
1193   my $self = shift;
1194   my $dbh  = $self->_dbh
1195     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1196   $dbh->commit;
1197 }
1198
1199 sub txn_rollback {
1200   my $self = shift;
1201   my $dbh = $self->_dbh;
1202   eval {
1203     if ($self->{transaction_depth} == 1) {
1204       $self->debugobj->txn_rollback()
1205         if ($self->debug);
1206       $self->{transaction_depth} = 0
1207         if $self->_dbh_autocommit;
1208       $self->_dbh_rollback;
1209     }
1210     elsif($self->{transaction_depth} > 1) {
1211       $self->{transaction_depth}--;
1212       if ($self->auto_savepoint) {
1213         $self->svp_rollback;
1214         $self->svp_release;
1215       }
1216     }
1217     else {
1218       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1219     }
1220   };
1221   if ($@) {
1222     my $error = $@;
1223     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1224     $error =~ /$exception_class/ and $self->throw_exception($error);
1225     # ensure that a failed rollback resets the transaction depth
1226     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1227     $self->throw_exception($error);
1228   }
1229 }
1230
1231 sub _dbh_rollback {
1232   my $self = shift;
1233   my $dbh  = $self->_dbh
1234     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1235   $dbh->rollback;
1236 }
1237
1238 # This used to be the top-half of _execute.  It was split out to make it
1239 #  easier to override in NoBindVars without duping the rest.  It takes up
1240 #  all of _execute's args, and emits $sql, @bind.
1241 sub _prep_for_execute {
1242   my ($self, $op, $extra_bind, $ident, $args) = @_;
1243
1244   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1245     $ident = $ident->from();
1246   }
1247
1248   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1249
1250   unshift(@bind,
1251     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1252       if $extra_bind;
1253   return ($sql, \@bind);
1254 }
1255
1256
1257 sub _fix_bind_params {
1258     my ($self, @bind) = @_;
1259
1260     ### Turn @bind from something like this:
1261     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1262     ### to this:
1263     ###   ( "'1'", "'1'", "'3'" )
1264     return
1265         map {
1266             if ( defined( $_ && $_->[1] ) ) {
1267                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1268             }
1269             else { q{'NULL'}; }
1270         } @bind;
1271 }
1272
1273 sub _query_start {
1274     my ( $self, $sql, @bind ) = @_;
1275
1276     if ( $self->debug ) {
1277         @bind = $self->_fix_bind_params(@bind);
1278
1279         $self->debugobj->query_start( $sql, @bind );
1280     }
1281 }
1282
1283 sub _query_end {
1284     my ( $self, $sql, @bind ) = @_;
1285
1286     if ( $self->debug ) {
1287         @bind = $self->_fix_bind_params(@bind);
1288         $self->debugobj->query_end( $sql, @bind );
1289     }
1290 }
1291
1292 sub _dbh_execute {
1293   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1294
1295   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1296
1297   $self->_query_start( $sql, @$bind );
1298
1299   my $sth = $self->sth($sql,$op);
1300
1301   my $placeholder_index = 1;
1302
1303   foreach my $bound (@$bind) {
1304     my $attributes = {};
1305     my($column_name, @data) = @$bound;
1306
1307     if ($bind_attributes) {
1308       $attributes = $bind_attributes->{$column_name}
1309       if defined $bind_attributes->{$column_name};
1310     }
1311
1312     foreach my $data (@data) {
1313       my $ref = ref $data;
1314       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1315
1316       $sth->bind_param($placeholder_index, $data, $attributes);
1317       $placeholder_index++;
1318     }
1319   }
1320
1321   # Can this fail without throwing an exception anyways???
1322   my $rv = $sth->execute();
1323   $self->throw_exception($sth->errstr) if !$rv;
1324
1325   $self->_query_end( $sql, @$bind );
1326
1327   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1328 }
1329
1330 sub _execute {
1331     my $self = shift;
1332     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1333 }
1334
1335 sub insert {
1336   my ($self, $source, $to_insert) = @_;
1337
1338   my $ident = $source->from;
1339   my $bind_attributes = $self->source_bind_attributes($source);
1340
1341   my $updated_cols = {};
1342
1343   foreach my $col ( $source->columns ) {
1344     if ( !defined $to_insert->{$col} ) {
1345       my $col_info = $source->column_info($col);
1346
1347       if ( $col_info->{auto_nextval} ) {
1348         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1349           'nextval',
1350           $col_info->{sequence} ||
1351             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1352         );
1353       }
1354     }
1355   }
1356
1357   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1358
1359   return $updated_cols;
1360 }
1361
1362 ## Still not quite perfect, and EXPERIMENTAL
1363 ## Currently it is assumed that all values passed will be "normal", i.e. not
1364 ## scalar refs, or at least, all the same type as the first set, the statement is
1365 ## only prepped once.
1366 sub insert_bulk {
1367   my ($self, $source, $cols, $data) = @_;
1368
1369   my %colvalues;
1370   @colvalues{@$cols} = (0..$#$cols);
1371
1372   for my $i (0..$#$cols) {
1373     my $first_val = $data->[0][$i];
1374     next unless ref $first_val eq 'SCALAR';
1375
1376     $colvalues{ $cols->[$i] } = $first_val;
1377   }
1378
1379   # check for bad data and stringify stringifiable objects
1380   my $bad_slice = sub {
1381     my ($msg, $col_idx, $slice_idx) = @_;
1382     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1383       $msg,
1384       $cols->[$col_idx],
1385       do {
1386         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1387         Data::Dumper::Concise::Dumper({
1388           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1389         }),
1390       }
1391     );
1392   };
1393
1394   for my $datum_idx (0..$#$data) {
1395     my $datum = $data->[$datum_idx];
1396
1397     for my $col_idx (0..$#$cols) {
1398       my $val            = $datum->[$col_idx];
1399       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1400       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1401
1402       if ($is_literal_sql) {
1403         if (not ref $val) {
1404           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1405         }
1406         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1407           $bad_slice->("$reftype reference found where literal SQL expected",
1408             $col_idx, $datum_idx);
1409         }
1410         elsif ($$val ne $$sqla_bind){
1411           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1412             $col_idx, $datum_idx);
1413         }
1414       }
1415       elsif (my $reftype = ref $val) {
1416         require overload;
1417         if (overload::Method($val, '""')) {
1418           $datum->[$col_idx] = "".$val;
1419         }
1420         else {
1421           $bad_slice->("$reftype reference found where bind expected",
1422             $col_idx, $datum_idx);
1423         }
1424       }
1425     }
1426   }
1427
1428   my ($sql, $bind) = $self->_prep_for_execute (
1429     'insert', undef, $source, [\%colvalues]
1430   );
1431   my @bind = @$bind;
1432
1433   my $empty_bind = 1 if (not @bind) &&
1434     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1435
1436   if ((not @bind) && (not $empty_bind)) {
1437     $self->throw_exception(
1438       'Cannot insert_bulk without support for placeholders'
1439     );
1440   }
1441
1442   $self->_query_start( $sql, ['__BULK__'] );
1443   my $sth = $self->sth($sql);
1444
1445   my $rv = do {
1446     if ($empty_bind) {
1447       # bind_param_array doesn't work if there are no binds
1448       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1449     }
1450     else {
1451 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1452       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1453     }
1454   };
1455
1456   $self->_query_end( $sql, ['__BULK__'] );
1457
1458   return (wantarray ? ($rv, $sth, @bind) : $rv);
1459 }
1460
1461 sub _execute_array {
1462   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1463
1464   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1465
1466   ## This must be an arrayref, else nothing works!
1467   my $tuple_status = [];
1468
1469   ## Get the bind_attributes, if any exist
1470   my $bind_attributes = $self->source_bind_attributes($source);
1471
1472   ## Bind the values and execute
1473   my $placeholder_index = 1;
1474
1475   foreach my $bound (@$bind) {
1476
1477     my $attributes = {};
1478     my ($column_name, $data_index) = @$bound;
1479
1480     if( $bind_attributes ) {
1481       $attributes = $bind_attributes->{$column_name}
1482       if defined $bind_attributes->{$column_name};
1483     }
1484
1485     my @data = map { $_->[$data_index] } @$data;
1486
1487     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1488     $placeholder_index++;
1489   }
1490
1491   my $rv = eval {
1492     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1493   };
1494   my $err = $@ || $sth->errstr;
1495
1496 # Statement must finish even if there was an exception.
1497   eval { $sth->finish };
1498   $err = $@ unless $err;
1499
1500   if ($err) {
1501     my $i = 0;
1502     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1503
1504     $self->throw_exception("Unexpected populate error: $err")
1505       if ($i > $#$tuple_status);
1506
1507     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1508       ($tuple_status->[$i][1] || $err),
1509       Data::Dumper::Concise::Dumper({
1510         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1511       }),
1512     );
1513   }
1514
1515   $guard->commit if $guard;
1516
1517   return $rv;
1518 }
1519
1520 sub _dbh_execute_array {
1521     my ($self, $sth, $tuple_status, @extra) = @_;
1522
1523     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1524 }
1525
1526 sub _dbh_execute_inserts_with_no_binds {
1527   my ($self, $sth, $count) = @_;
1528
1529   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1530
1531   eval {
1532     my $dbh = $self->_get_dbh;
1533     local $dbh->{RaiseError} = 1;
1534     local $dbh->{PrintError} = 0;
1535
1536     $sth->execute foreach 1..$count;
1537   };
1538   my $exception = $@;
1539
1540 # Make sure statement is finished even if there was an exception.
1541   eval { $sth->finish };
1542   $exception = $@ unless $exception;
1543
1544   $self->throw_exception($exception) if $exception;
1545
1546   $guard->commit if $guard;
1547
1548   return $count;
1549 }
1550
1551 sub update {
1552   my ($self, $source, @args) = @_; 
1553
1554   my $bind_attrs = $self->source_bind_attributes($source);
1555
1556   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1557 }
1558
1559
1560 sub delete {
1561   my ($self, $source, @args) = @_; 
1562
1563   my $bind_attrs = $self->source_bind_attributes($source);
1564
1565   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1566 }
1567
1568 # We were sent here because the $rs contains a complex search
1569 # which will require a subquery to select the correct rows
1570 # (i.e. joined or limited resultsets)
1571 #
1572 # Generating a single PK column subquery is trivial and supported
1573 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1574 # Look at _multipk_update_delete()
1575 sub _subq_update_delete {
1576   my $self = shift;
1577   my ($rs, $op, $values) = @_;
1578
1579   my $rsrc = $rs->result_source;
1580
1581   # we already check this, but double check naively just in case. Should be removed soon
1582   my $sel = $rs->_resolved_attrs->{select};
1583   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1584   my @pcols = $rsrc->primary_columns;
1585   if (@$sel != @pcols) {
1586     $self->throw_exception (
1587       'Subquery update/delete can not be called on resultsets selecting a'
1588      .' number of columns different than the number of primary keys'
1589     );
1590   }
1591
1592   if (@pcols == 1) {
1593     return $self->$op (
1594       $rsrc,
1595       $op eq 'update' ? $values : (),
1596       { $pcols[0] => { -in => $rs->as_query } },
1597     );
1598   }
1599
1600   else {
1601     return $self->_multipk_update_delete (@_);
1602   }
1603 }
1604
1605 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1606 # resultset update/delete involving subqueries. So by default resort
1607 # to simple (and inefficient) delete_all style per-row opearations,
1608 # while allowing specific storages to override this with a faster
1609 # implementation.
1610 #
1611 sub _multipk_update_delete {
1612   return shift->_per_row_update_delete (@_);
1613 }
1614
1615 # This is the default loop used to delete/update rows for multi PK
1616 # resultsets, and used by mysql exclusively (because it can't do anything
1617 # else).
1618 #
1619 # We do not use $row->$op style queries, because resultset update/delete
1620 # is not expected to cascade (this is what delete_all/update_all is for).
1621 #
1622 # There should be no race conditions as the entire operation is rolled
1623 # in a transaction.
1624 #
1625 sub _per_row_update_delete {
1626   my $self = shift;
1627   my ($rs, $op, $values) = @_;
1628
1629   my $rsrc = $rs->result_source;
1630   my @pcols = $rsrc->primary_columns;
1631
1632   my $guard = $self->txn_scope_guard;
1633
1634   # emulate the return value of $sth->execute for non-selects
1635   my $row_cnt = '0E0';
1636
1637   my $subrs_cur = $rs->cursor;
1638   while (my @pks = $subrs_cur->next) {
1639
1640     my $cond;
1641     for my $i (0.. $#pcols) {
1642       $cond->{$pcols[$i]} = $pks[$i];
1643     }
1644
1645     $self->$op (
1646       $rsrc,
1647       $op eq 'update' ? $values : (),
1648       $cond,
1649     );
1650
1651     $row_cnt++;
1652   }
1653
1654   $guard->commit;
1655
1656   return $row_cnt;
1657 }
1658
1659 sub _select {
1660   my $self = shift;
1661
1662   # localization is neccessary as
1663   # 1) there is no infrastructure to pass this around before SQLA2
1664   # 2) _select_args sets it and _prep_for_execute consumes it
1665   my $sql_maker = $self->sql_maker;
1666   local $sql_maker->{_dbic_rs_attrs};
1667
1668   return $self->_execute($self->_select_args(@_));
1669 }
1670
1671 sub _select_args_to_query {
1672   my $self = shift;
1673
1674   # localization is neccessary as
1675   # 1) there is no infrastructure to pass this around before SQLA2
1676   # 2) _select_args sets it and _prep_for_execute consumes it
1677   my $sql_maker = $self->sql_maker;
1678   local $sql_maker->{_dbic_rs_attrs};
1679
1680   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1681   #  = $self->_select_args($ident, $select, $cond, $attrs);
1682   my ($op, $bind, $ident, $bind_attrs, @args) =
1683     $self->_select_args(@_);
1684
1685   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1686   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1687   $prepared_bind ||= [];
1688
1689   return wantarray
1690     ? ($sql, $prepared_bind, $bind_attrs)
1691     : \[ "($sql)", @$prepared_bind ]
1692   ;
1693 }
1694
1695 sub _select_args {
1696   my ($self, $ident, $select, $where, $attrs) = @_;
1697
1698   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1699
1700   my $sql_maker = $self->sql_maker;
1701   $sql_maker->{_dbic_rs_attrs} = {
1702     %$attrs,
1703     select => $select,
1704     from => $ident,
1705     where => $where,
1706     $rs_alias
1707       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1708       : ()
1709     ,
1710   };
1711
1712   # calculate bind_attrs before possible $ident mangling
1713   my $bind_attrs = {};
1714   for my $alias (keys %$alias2source) {
1715     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1716     for my $col (keys %$bindtypes) {
1717
1718       my $fqcn = join ('.', $alias, $col);
1719       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1720
1721       # Unqialified column names are nice, but at the same time can be
1722       # rather ambiguous. What we do here is basically go along with
1723       # the loop, adding an unqualified column slot to $bind_attrs,
1724       # alongside the fully qualified name. As soon as we encounter
1725       # another column by that name (which would imply another table)
1726       # we unset the unqualified slot and never add any info to it
1727       # to avoid erroneous type binding. If this happens the users
1728       # only choice will be to fully qualify his column name
1729
1730       if (exists $bind_attrs->{$col}) {
1731         $bind_attrs->{$col} = {};
1732       }
1733       else {
1734         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1735       }
1736     }
1737   }
1738
1739   # adjust limits
1740   if (
1741     $attrs->{software_limit}
1742       ||
1743     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1744   ) {
1745     $attrs->{software_limit} = 1;
1746   }
1747   else {
1748     $self->throw_exception("rows attribute must be positive if present")
1749       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1750
1751     # MySQL actually recommends this approach.  I cringe.
1752     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1753   }
1754
1755   my @limit;
1756
1757   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1758   # otherwise delegate the limiting to the storage, unless software limit was requested
1759   if (
1760     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1761        ||
1762     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1763       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1764   ) {
1765     ($ident, $select, $where, $attrs)
1766       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1767   }
1768   elsif (! $attrs->{software_limit} ) {
1769     push @limit, $attrs->{rows}, $attrs->{offset};
1770   }
1771
1772 ###
1773   # This would be the point to deflate anything found in $where
1774   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1775   # expect a row object. And all we have is a resultsource (it is trivial
1776   # to extract deflator coderefs via $alias2source above).
1777   #
1778   # I don't see a way forward other than changing the way deflators are
1779   # invoked, and that's just bad...
1780 ###
1781
1782   my $order = { map
1783     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1784     (qw/order_by group_by having/ )
1785   };
1786
1787   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1788 }
1789
1790 # Returns a counting SELECT for a simple count
1791 # query. Abstracted so that a storage could override
1792 # this to { count => 'firstcol' } or whatever makes
1793 # sense as a performance optimization
1794 sub _count_select {
1795   #my ($self, $source, $rs_attrs) = @_;
1796   return { count => '*' };
1797 }
1798
1799 # Returns a SELECT which will end up in the subselect
1800 # There may or may not be a group_by, as the subquery
1801 # might have been called to accomodate a limit
1802 #
1803 # Most databases would be happy with whatever ends up
1804 # here, but some choke in various ways.
1805 #
1806 sub _subq_count_select {
1807   my ($self, $source, $rs_attrs) = @_;
1808   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1809
1810   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1811   return @pcols ? \@pcols : [ 1 ];
1812 }
1813
1814 sub source_bind_attributes {
1815   my ($self, $source) = @_;
1816
1817   my $bind_attributes;
1818   foreach my $column ($source->columns) {
1819
1820     my $data_type = $source->column_info($column)->{data_type} || '';
1821     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1822      if $data_type;
1823   }
1824
1825   return $bind_attributes;
1826 }
1827
1828 =head2 select
1829
1830 =over 4
1831
1832 =item Arguments: $ident, $select, $condition, $attrs
1833
1834 =back
1835
1836 Handle a SQL select statement.
1837
1838 =cut
1839
1840 sub select {
1841   my $self = shift;
1842   my ($ident, $select, $condition, $attrs) = @_;
1843   return $self->cursor_class->new($self, \@_, $attrs);
1844 }
1845
1846 sub select_single {
1847   my $self = shift;
1848   my ($rv, $sth, @bind) = $self->_select(@_);
1849   my @row = $sth->fetchrow_array;
1850   my @nextrow = $sth->fetchrow_array if @row;
1851   if(@row && @nextrow) {
1852     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1853   }
1854   # Need to call finish() to work round broken DBDs
1855   $sth->finish();
1856   return @row;
1857 }
1858
1859 =head2 sth
1860
1861 =over 4
1862
1863 =item Arguments: $sql
1864
1865 =back
1866
1867 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1868
1869 =cut
1870
1871 sub _dbh_sth {
1872   my ($self, $dbh, $sql) = @_;
1873
1874   # 3 is the if_active parameter which avoids active sth re-use
1875   my $sth = $self->disable_sth_caching
1876     ? $dbh->prepare($sql)
1877     : $dbh->prepare_cached($sql, {}, 3);
1878
1879   # XXX You would think RaiseError would make this impossible,
1880   #  but apparently that's not true :(
1881   $self->throw_exception($dbh->errstr) if !$sth;
1882
1883   $sth;
1884 }
1885
1886 sub sth {
1887   my ($self, $sql) = @_;
1888   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
1889 }
1890
1891 sub _dbh_columns_info_for {
1892   my ($self, $dbh, $table) = @_;
1893
1894   if ($dbh->can('column_info')) {
1895     my %result;
1896     eval {
1897       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1898       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1899       $sth->execute();
1900       while ( my $info = $sth->fetchrow_hashref() ){
1901         my %column_info;
1902         $column_info{data_type}   = $info->{TYPE_NAME};
1903         $column_info{size}      = $info->{COLUMN_SIZE};
1904         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1905         $column_info{default_value} = $info->{COLUMN_DEF};
1906         my $col_name = $info->{COLUMN_NAME};
1907         $col_name =~ s/^\"(.*)\"$/$1/;
1908
1909         $result{$col_name} = \%column_info;
1910       }
1911     };
1912     return \%result if !$@ && scalar keys %result;
1913   }
1914
1915   my %result;
1916   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1917   $sth->execute;
1918   my @columns = @{$sth->{NAME_lc}};
1919   for my $i ( 0 .. $#columns ){
1920     my %column_info;
1921     $column_info{data_type} = $sth->{TYPE}->[$i];
1922     $column_info{size} = $sth->{PRECISION}->[$i];
1923     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1924
1925     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1926       $column_info{data_type} = $1;
1927       $column_info{size}    = $2;
1928     }
1929
1930     $result{$columns[$i]} = \%column_info;
1931   }
1932   $sth->finish;
1933
1934   foreach my $col (keys %result) {
1935     my $colinfo = $result{$col};
1936     my $type_num = $colinfo->{data_type};
1937     my $type_name;
1938     if(defined $type_num && $dbh->can('type_info')) {
1939       my $type_info = $dbh->type_info($type_num);
1940       $type_name = $type_info->{TYPE_NAME} if $type_info;
1941       $colinfo->{data_type} = $type_name if $type_name;
1942     }
1943   }
1944
1945   return \%result;
1946 }
1947
1948 sub columns_info_for {
1949   my ($self, $table) = @_;
1950   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
1951 }
1952
1953 =head2 last_insert_id
1954
1955 Return the row id of the last insert.
1956
1957 =cut
1958
1959 sub _dbh_last_insert_id {
1960     # All Storage's need to register their own _dbh_last_insert_id
1961     # the old SQLite-based method was highly inappropriate
1962
1963     my $self = shift;
1964     my $class = ref $self;
1965     $self->throw_exception (<<EOE);
1966
1967 No _dbh_last_insert_id() method found in $class.
1968 Since the method of obtaining the autoincrement id of the last insert
1969 operation varies greatly between different databases, this method must be
1970 individually implemented for every storage class.
1971 EOE
1972 }
1973
1974 sub last_insert_id {
1975   my $self = shift;
1976   $self->_dbh_last_insert_id ($self->_dbh, @_);
1977 }
1978
1979 =head2 _native_data_type
1980
1981 =over 4
1982
1983 =item Arguments: $type_name
1984
1985 =back
1986
1987 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
1988 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
1989 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
1990
1991 The default implementation returns C<undef>, implement in your Storage driver if
1992 you need this functionality.
1993
1994 Should map types from other databases to the native RDBMS type, for example
1995 C<VARCHAR2> to C<VARCHAR>.
1996
1997 Types with modifiers should map to the underlying data type. For example,
1998 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
1999
2000 Composite types should map to the container type, for example
2001 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2002
2003 =cut
2004
2005 sub _native_data_type {
2006   #my ($self, $data_type) = @_;
2007   return undef
2008 }
2009
2010 # Check if placeholders are supported at all
2011 sub _placeholders_supported {
2012   my $self = shift;
2013   my $dbh  = $self->_get_dbh;
2014
2015   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2016   # but it is inaccurate more often than not
2017   eval {
2018     local $dbh->{PrintError} = 0;
2019     local $dbh->{RaiseError} = 1;
2020     $dbh->do('select ?', {}, 1);
2021   };
2022   return $@ ? 0 : 1;
2023 }
2024
2025 # Check if placeholders bound to non-string types throw exceptions
2026 #
2027 sub _typeless_placeholders_supported {
2028   my $self = shift;
2029   my $dbh  = $self->_get_dbh;
2030
2031   eval {
2032     local $dbh->{PrintError} = 0;
2033     local $dbh->{RaiseError} = 1;
2034     # this specifically tests a bind that is NOT a string
2035     $dbh->do('select 1 where 1 = ?', {}, 1);
2036   };
2037   return $@ ? 0 : 1;
2038 }
2039
2040 =head2 sqlt_type
2041
2042 Returns the database driver name.
2043
2044 =cut
2045
2046 sub sqlt_type {
2047   shift->_get_dbh->{Driver}->{Name};
2048 }
2049
2050 =head2 bind_attribute_by_data_type
2051
2052 Given a datatype from column info, returns a database specific bind
2053 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2054 let the database planner just handle it.
2055
2056 Generally only needed for special case column types, like bytea in postgres.
2057
2058 =cut
2059
2060 sub bind_attribute_by_data_type {
2061     return;
2062 }
2063
2064 =head2 is_datatype_numeric
2065
2066 Given a datatype from column_info, returns a boolean value indicating if
2067 the current RDBMS considers it a numeric value. This controls how
2068 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2069 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2070 be performed instead of the usual C<eq>.
2071
2072 =cut
2073
2074 sub is_datatype_numeric {
2075   my ($self, $dt) = @_;
2076
2077   return 0 unless $dt;
2078
2079   return $dt =~ /^ (?:
2080     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2081   ) $/ix;
2082 }
2083
2084
2085 =head2 create_ddl_dir (EXPERIMENTAL)
2086
2087 =over 4
2088
2089 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2090
2091 =back
2092
2093 Creates a SQL file based on the Schema, for each of the specified
2094 database engines in C<\@databases> in the given directory.
2095 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2096
2097 Given a previous version number, this will also create a file containing
2098 the ALTER TABLE statements to transform the previous schema into the
2099 current one. Note that these statements may contain C<DROP TABLE> or
2100 C<DROP COLUMN> statements that can potentially destroy data.
2101
2102 The file names are created using the C<ddl_filename> method below, please
2103 override this method in your schema if you would like a different file
2104 name format. For the ALTER file, the same format is used, replacing
2105 $version in the name with "$preversion-$version".
2106
2107 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2108 The most common value for this would be C<< { add_drop_table => 1 } >>
2109 to have the SQL produced include a C<DROP TABLE> statement for each table
2110 created. For quoting purposes supply C<quote_table_names> and
2111 C<quote_field_names>.
2112
2113 If no arguments are passed, then the following default values are assumed:
2114
2115 =over 4
2116
2117 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2118
2119 =item version    - $schema->schema_version
2120
2121 =item directory  - './'
2122
2123 =item preversion - <none>
2124
2125 =back
2126
2127 By default, C<\%sqlt_args> will have
2128
2129  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2130
2131 merged with the hash passed in. To disable any of those features, pass in a
2132 hashref like the following
2133
2134  { ignore_constraint_names => 0, # ... other options }
2135
2136
2137 Note that this feature is currently EXPERIMENTAL and may not work correctly
2138 across all databases, or fully handle complex relationships.
2139
2140 WARNING: Please check all SQL files created, before applying them.
2141
2142 =cut
2143
2144 sub create_ddl_dir {
2145   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2146
2147   if(!$dir || !-d $dir) {
2148     carp "No directory given, using ./\n";
2149     $dir = "./";
2150   }
2151   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2152   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2153
2154   my $schema_version = $schema->schema_version || '1.x';
2155   $version ||= $schema_version;
2156
2157   $sqltargs = {
2158     add_drop_table => 1,
2159     ignore_constraint_names => 1,
2160     ignore_index_names => 1,
2161     %{$sqltargs || {}}
2162   };
2163
2164   $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
2165     if !$self->_sqlt_version_ok;
2166
2167   my $sqlt = SQL::Translator->new( $sqltargs );
2168
2169   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2170   my $sqlt_schema = $sqlt->translate({ data => $schema })
2171     or $self->throw_exception ($sqlt->error);
2172
2173   foreach my $db (@$databases) {
2174     $sqlt->reset();
2175     $sqlt->{schema} = $sqlt_schema;
2176     $sqlt->producer($db);
2177
2178     my $file;
2179     my $filename = $schema->ddl_filename($db, $version, $dir);
2180     if (-e $filename && ($version eq $schema_version )) {
2181       # if we are dumping the current version, overwrite the DDL
2182       carp "Overwriting existing DDL file - $filename";
2183       unlink($filename);
2184     }
2185
2186     my $output = $sqlt->translate;
2187     if(!$output) {
2188       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2189       next;
2190     }
2191     if(!open($file, ">$filename")) {
2192       $self->throw_exception("Can't open $filename for writing ($!)");
2193       next;
2194     }
2195     print $file $output;
2196     close($file);
2197
2198     next unless ($preversion);
2199
2200     require SQL::Translator::Diff;
2201
2202     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2203     if(!-e $prefilename) {
2204       carp("No previous schema file found ($prefilename)");
2205       next;
2206     }
2207
2208     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2209     if(-e $difffile) {
2210       carp("Overwriting existing diff file - $difffile");
2211       unlink($difffile);
2212     }
2213
2214     my $source_schema;
2215     {
2216       my $t = SQL::Translator->new($sqltargs);
2217       $t->debug( 0 );
2218       $t->trace( 0 );
2219
2220       $t->parser( $db )
2221         or $self->throw_exception ($t->error);
2222
2223       my $out = $t->translate( $prefilename )
2224         or $self->throw_exception ($t->error);
2225
2226       $source_schema = $t->schema;
2227
2228       $source_schema->name( $prefilename )
2229         unless ( $source_schema->name );
2230     }
2231
2232     # The "new" style of producers have sane normalization and can support
2233     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2234     # And we have to diff parsed SQL against parsed SQL.
2235     my $dest_schema = $sqlt_schema;
2236
2237     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2238       my $t = SQL::Translator->new($sqltargs);
2239       $t->debug( 0 );
2240       $t->trace( 0 );
2241
2242       $t->parser( $db )
2243         or $self->throw_exception ($t->error);
2244
2245       my $out = $t->translate( $filename )
2246         or $self->throw_exception ($t->error);
2247
2248       $dest_schema = $t->schema;
2249
2250       $dest_schema->name( $filename )
2251         unless $dest_schema->name;
2252     }
2253
2254     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2255                                                   $dest_schema,   $db,
2256                                                   $sqltargs
2257                                                  );
2258     if(!open $file, ">$difffile") {
2259       $self->throw_exception("Can't write to $difffile ($!)");
2260       next;
2261     }
2262     print $file $diff;
2263     close($file);
2264   }
2265 }
2266
2267 =head2 deployment_statements
2268
2269 =over 4
2270
2271 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2272
2273 =back
2274
2275 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2276
2277 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2278 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2279
2280 C<$directory> is used to return statements from files in a previously created
2281 L</create_ddl_dir> directory and is optional. The filenames are constructed
2282 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2283
2284 If no C<$directory> is specified then the statements are constructed on the
2285 fly using L<SQL::Translator> and C<$version> is ignored.
2286
2287 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2288
2289 =cut
2290
2291 sub deployment_statements {
2292   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2293   $type ||= $self->sqlt_type;
2294   $version ||= $schema->schema_version || '1.x';
2295   $dir ||= './';
2296   my $filename = $schema->ddl_filename($type, $version, $dir);
2297   if(-f $filename)
2298   {
2299       my $file;
2300       open($file, "<$filename")
2301         or $self->throw_exception("Can't open $filename ($!)");
2302       my @rows = <$file>;
2303       close($file);
2304       return join('', @rows);
2305   }
2306
2307   $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
2308     if !$self->_sqlt_version_ok;
2309
2310   # sources needs to be a parser arg, but for simplicty allow at top level
2311   # coming in
2312   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2313       if exists $sqltargs->{sources};
2314
2315   my $tr = SQL::Translator->new(
2316     producer => "SQL::Translator::Producer::${type}",
2317     %$sqltargs,
2318     parser => 'SQL::Translator::Parser::DBIx::Class',
2319     data => $schema,
2320   );
2321
2322   my $ret = $tr->translate
2323     or $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error);
2324
2325   return $ret;
2326 }
2327
2328 sub deploy {
2329   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2330   my $deploy = sub {
2331     my $line = shift;
2332     return if($line =~ /^--/);
2333     return if(!$line);
2334     # next if($line =~ /^DROP/m);
2335     return if($line =~ /^BEGIN TRANSACTION/m);
2336     return if($line =~ /^COMMIT/m);
2337     return if $line =~ /^\s+$/; # skip whitespace only
2338     $self->_query_start($line);
2339     eval {
2340       # do a dbh_do cycle here, as we need some error checking in
2341       # place (even though we will ignore errors)
2342       $self->dbh_do (sub { $_[1]->do($line) });
2343     };
2344     if ($@) {
2345       carp qq{$@ (running "${line}")};
2346     }
2347     $self->_query_end($line);
2348   };
2349   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2350   if (@statements > 1) {
2351     foreach my $statement (@statements) {
2352       $deploy->( $statement );
2353     }
2354   }
2355   elsif (@statements == 1) {
2356     foreach my $line ( split(";\n", $statements[0])) {
2357       $deploy->( $line );
2358     }
2359   }
2360 }
2361
2362 =head2 datetime_parser
2363
2364 Returns the datetime parser class
2365
2366 =cut
2367
2368 sub datetime_parser {
2369   my $self = shift;
2370   return $self->{datetime_parser} ||= do {
2371     $self->build_datetime_parser(@_);
2372   };
2373 }
2374
2375 =head2 datetime_parser_type
2376
2377 Defines (returns) the datetime parser class - currently hardwired to
2378 L<DateTime::Format::MySQL>
2379
2380 =cut
2381
2382 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2383
2384 =head2 build_datetime_parser
2385
2386 See L</datetime_parser>
2387
2388 =cut
2389
2390 sub build_datetime_parser {
2391   my $self = shift;
2392   my $type = $self->datetime_parser_type(@_);
2393   $self->ensure_class_loaded ($type);
2394   return $type;
2395 }
2396
2397
2398 =head2 is_replicating
2399
2400 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2401 replicate from a master database.  Default is undef, which is the result
2402 returned by databases that don't support replication.
2403
2404 =cut
2405
2406 sub is_replicating {
2407     return;
2408
2409 }
2410
2411 =head2 lag_behind_master
2412
2413 Returns a number that represents a certain amount of lag behind a master db
2414 when a given storage is replicating.  The number is database dependent, but
2415 starts at zero and increases with the amount of lag. Default in undef
2416
2417 =cut
2418
2419 sub lag_behind_master {
2420     return;
2421 }
2422
2423 # SQLT version handling
2424 {
2425   my $_sqlt_version_ok;     # private
2426   my $_sqlt_version_error;  # private
2427
2428   sub _sqlt_version_ok {
2429     if (!defined $_sqlt_version_ok) {
2430       eval "use SQL::Translator $minimum_sqlt_version";
2431       if ($@) {
2432         $_sqlt_version_ok = 0;
2433         $_sqlt_version_error = $@;
2434       }
2435       else {
2436         $_sqlt_version_ok = 1;
2437       }
2438     }
2439     return $_sqlt_version_ok;
2440   }
2441
2442   sub _sqlt_version_error {
2443     shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
2444     return $_sqlt_version_error;
2445   }
2446
2447   sub _sqlt_minimum_version { $minimum_sqlt_version };
2448 }
2449
2450 sub DESTROY {
2451   my $self = shift;
2452
2453   $self->_verify_pid if $self->_dbh;
2454
2455   # some databases need this to stop spewing warnings
2456   if (my $dbh = $self->_dbh) {
2457     local $@;
2458     eval { $dbh->disconnect };
2459   }
2460
2461   $self->_dbh(undef);
2462 }
2463
2464 1;
2465
2466 =head1 USAGE NOTES
2467
2468 =head2 DBIx::Class and AutoCommit
2469
2470 DBIx::Class can do some wonderful magic with handling exceptions,
2471 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2472 (the default) combined with C<txn_do> for transaction support.
2473
2474 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2475 in an assumed transaction between commits, and you're telling us you'd
2476 like to manage that manually.  A lot of the magic protections offered by
2477 this module will go away.  We can't protect you from exceptions due to database
2478 disconnects because we don't know anything about how to restart your
2479 transactions.  You're on your own for handling all sorts of exceptional
2480 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2481 be with raw DBI.
2482
2483
2484 =head1 AUTHORS
2485
2486 Matt S. Trout <mst@shadowcatsystems.co.uk>
2487
2488 Andy Grundman <andy@hybridized.org>
2489
2490 =head1 LICENSE
2491
2492 You may distribute this code under the same terms as Perl itself.
2493
2494 =cut