should work now
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18
19 # what version of sqlt do we require if deploy() without a ddl_dir is invoked
20 # when changing also adjust the corresponding author_require in Makefile.PL
21 my $minimum_sqlt_version = '0.11002';
22
23
24 __PACKAGE__->mk_group_accessors('simple' =>
25   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
26      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
27 );
28
29 # the values for these accessors are picked out (and deleted) from
30 # the attribute hashref passed to connect_info
31 my @storage_options = qw/
32   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
33   disable_sth_caching unsafe auto_savepoint
34 /;
35 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
36
37
38 # default cursor class, overridable in connect_info attributes
39 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
40
41 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
42 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
43
44
45 # Each of these methods need _determine_driver called before itself
46 # in order to function reliably. This is a purely DRY optimization
47 my @rdbms_specific_methods = qw/
48   deployment_statements
49   sqlt_type
50   build_datetime_parser
51   datetime_parser_type
52
53   insert
54   insert_bulk
55   update
56   delete
57   select
58   select_single
59 /;
60
61 for my $meth (@rdbms_specific_methods) {
62
63   my $orig = __PACKAGE__->can ($meth)
64     or next;
65
66   no strict qw/refs/;
67   no warnings qw/redefine/;
68   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
69     if (not $_[0]->_driver_determined) {
70       $_[0]->_determine_driver;
71       goto $_[0]->can($meth);
72     }
73     $orig->(@_);
74   };
75 }
76
77
78 =head1 NAME
79
80 DBIx::Class::Storage::DBI - DBI storage handler
81
82 =head1 SYNOPSIS
83
84   my $schema = MySchema->connect('dbi:SQLite:my.db');
85
86   $schema->storage->debug(1);
87
88   my @stuff = $schema->storage->dbh_do(
89     sub {
90       my ($storage, $dbh, @args) = @_;
91       $dbh->do("DROP TABLE authors");
92     },
93     @column_list
94   );
95
96   $schema->resultset('Book')->search({
97      written_on => $schema->storage->datetime_parser(DateTime->now)
98   });
99
100 =head1 DESCRIPTION
101
102 This class represents the connection to an RDBMS via L<DBI>.  See
103 L<DBIx::Class::Storage> for general information.  This pod only
104 documents DBI-specific methods and behaviors.
105
106 =head1 METHODS
107
108 =cut
109
110 sub new {
111   my $new = shift->next::method(@_);
112
113   $new->transaction_depth(0);
114   $new->_sql_maker_opts({});
115   $new->{savepoints} = [];
116   $new->{_in_dbh_do} = 0;
117   $new->{_dbh_gen} = 0;
118
119   $new;
120 }
121
122 =head2 connect_info
123
124 This method is normally called by L<DBIx::Class::Schema/connection>, which
125 encapsulates its argument list in an arrayref before passing them here.
126
127 The argument list may contain:
128
129 =over
130
131 =item *
132
133 The same 4-element argument set one would normally pass to
134 L<DBI/connect>, optionally followed by
135 L<extra attributes|/DBIx::Class specific connection attributes>
136 recognized by DBIx::Class:
137
138   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
139
140 =item *
141
142 A single code reference which returns a connected
143 L<DBI database handle|DBI/connect> optionally followed by
144 L<extra attributes|/DBIx::Class specific connection attributes> recognized
145 by DBIx::Class:
146
147   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
148
149 =item *
150
151 A single hashref with all the attributes and the dsn/user/password
152 mixed together:
153
154   $connect_info_args = [{
155     dsn => $dsn,
156     user => $user,
157     password => $pass,
158     %dbi_attributes,
159     %extra_attributes,
160   }];
161
162   $connect_info_args = [{
163     dbh_maker => sub { DBI->connect (...) },
164     %dbi_attributes,
165     %extra_attributes,
166   }];
167
168 This is particularly useful for L<Catalyst> based applications, allowing the
169 following config (L<Config::General> style):
170
171   <Model::DB>
172     schema_class   App::DB
173     <connect_info>
174       dsn          dbi:mysql:database=test
175       user         testuser
176       password     TestPass
177       AutoCommit   1
178     </connect_info>
179   </Model::DB>
180
181 The C<dsn>/C<user>/C<password> combination can be substituted by the
182 C<dbh_maker> key whose value is a coderef that returns a connected
183 L<DBI database handle|DBI/connect>
184
185 =back
186
187 Please note that the L<DBI> docs recommend that you always explicitly
188 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
189 recommends that it be set to I<1>, and that you perform transactions
190 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
191 to I<1> if you do not do explicitly set it to zero.  This is the default
192 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
193
194 =head3 DBIx::Class specific connection attributes
195
196 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
197 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
198 the following connection options. These options can be mixed in with your other
199 L<DBI> connection attributes, or placed in a seperate hashref
200 (C<\%extra_attributes>) as shown above.
201
202 Every time C<connect_info> is invoked, any previous settings for
203 these options will be cleared before setting the new ones, regardless of
204 whether any options are specified in the new C<connect_info>.
205
206
207 =over
208
209 =item on_connect_do
210
211 Specifies things to do immediately after connecting or re-connecting to
212 the database.  Its value may contain:
213
214 =over
215
216 =item a scalar
217
218 This contains one SQL statement to execute.
219
220 =item an array reference
221
222 This contains SQL statements to execute in order.  Each element contains
223 a string or a code reference that returns a string.
224
225 =item a code reference
226
227 This contains some code to execute.  Unlike code references within an
228 array reference, its return value is ignored.
229
230 =back
231
232 =item on_disconnect_do
233
234 Takes arguments in the same form as L</on_connect_do> and executes them
235 immediately before disconnecting from the database.
236
237 Note, this only runs if you explicitly call L</disconnect> on the
238 storage object.
239
240 =item on_connect_call
241
242 A more generalized form of L</on_connect_do> that calls the specified
243 C<connect_call_METHOD> methods in your storage driver.
244
245   on_connect_do => 'select 1'
246
247 is equivalent to:
248
249   on_connect_call => [ [ do_sql => 'select 1' ] ]
250
251 Its values may contain:
252
253 =over
254
255 =item a scalar
256
257 Will call the C<connect_call_METHOD> method.
258
259 =item a code reference
260
261 Will execute C<< $code->($storage) >>
262
263 =item an array reference
264
265 Each value can be a method name or code reference.
266
267 =item an array of arrays
268
269 For each array, the first item is taken to be the C<connect_call_> method name
270 or code reference, and the rest are parameters to it.
271
272 =back
273
274 Some predefined storage methods you may use:
275
276 =over
277
278 =item do_sql
279
280 Executes a SQL string or a code reference that returns a SQL string. This is
281 what L</on_connect_do> and L</on_disconnect_do> use.
282
283 It can take:
284
285 =over
286
287 =item a scalar
288
289 Will execute the scalar as SQL.
290
291 =item an arrayref
292
293 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
294 attributes hashref and bind values.
295
296 =item a code reference
297
298 Will execute C<< $code->($storage) >> and execute the return array refs as
299 above.
300
301 =back
302
303 =item datetime_setup
304
305 Execute any statements necessary to initialize the database session to return
306 and accept datetime/timestamp values used with
307 L<DBIx::Class::InflateColumn::DateTime>.
308
309 Only necessary for some databases, see your specific storage driver for
310 implementation details.
311
312 =back
313
314 =item on_disconnect_call
315
316 Takes arguments in the same form as L</on_connect_call> and executes them
317 immediately before disconnecting from the database.
318
319 Calls the C<disconnect_call_METHOD> methods as opposed to the
320 C<connect_call_METHOD> methods called by L</on_connect_call>.
321
322 Note, this only runs if you explicitly call L</disconnect> on the
323 storage object.
324
325 =item disable_sth_caching
326
327 If set to a true value, this option will disable the caching of
328 statement handles via L<DBI/prepare_cached>.
329
330 =item limit_dialect
331
332 Sets the limit dialect. This is useful for JDBC-bridge among others
333 where the remote SQL-dialect cannot be determined by the name of the
334 driver alone. See also L<SQL::Abstract::Limit>.
335
336 =item quote_char
337
338 Specifies what characters to use to quote table and column names. If
339 you use this you will want to specify L</name_sep> as well.
340
341 C<quote_char> expects either a single character, in which case is it
342 is placed on either side of the table/column name, or an arrayref of length
343 2 in which case the table/column name is placed between the elements.
344
345 For example under MySQL you should use C<< quote_char => '`' >>, and for
346 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
347
348 =item name_sep
349
350 This only needs to be used in conjunction with C<quote_char>, and is used to
351 specify the charecter that seperates elements (schemas, tables, columns) from
352 each other. In most cases this is simply a C<.>.
353
354 The consequences of not supplying this value is that L<SQL::Abstract>
355 will assume DBIx::Class' uses of aliases to be complete column
356 names. The output will look like I<"me.name"> when it should actually
357 be I<"me"."name">.
358
359 =item unsafe
360
361 This Storage driver normally installs its own C<HandleError>, sets
362 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
363 all database handles, including those supplied by a coderef.  It does this
364 so that it can have consistent and useful error behavior.
365
366 If you set this option to a true value, Storage will not do its usual
367 modifications to the database handle's attributes, and instead relies on
368 the settings in your connect_info DBI options (or the values you set in
369 your connection coderef, in the case that you are connecting via coderef).
370
371 Note that your custom settings can cause Storage to malfunction,
372 especially if you set a C<HandleError> handler that suppresses exceptions
373 and/or disable C<RaiseError>.
374
375 =item auto_savepoint
376
377 If this option is true, L<DBIx::Class> will use savepoints when nesting
378 transactions, making it possible to recover from failure in the inner
379 transaction without having to abort all outer transactions.
380
381 =item cursor_class
382
383 Use this argument to supply a cursor class other than the default
384 L<DBIx::Class::Storage::DBI::Cursor>.
385
386 =back
387
388 Some real-life examples of arguments to L</connect_info> and
389 L<DBIx::Class::Schema/connect>
390
391   # Simple SQLite connection
392   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
393
394   # Connect via subref
395   ->connect_info([ sub { DBI->connect(...) } ]);
396
397   # Connect via subref in hashref
398   ->connect_info([{
399     dbh_maker => sub { DBI->connect(...) },
400     on_connect_do => 'alter session ...',
401   }]);
402
403   # A bit more complicated
404   ->connect_info(
405     [
406       'dbi:Pg:dbname=foo',
407       'postgres',
408       'my_pg_password',
409       { AutoCommit => 1 },
410       { quote_char => q{"}, name_sep => q{.} },
411     ]
412   );
413
414   # Equivalent to the previous example
415   ->connect_info(
416     [
417       'dbi:Pg:dbname=foo',
418       'postgres',
419       'my_pg_password',
420       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
421     ]
422   );
423
424   # Same, but with hashref as argument
425   # See parse_connect_info for explanation
426   ->connect_info(
427     [{
428       dsn         => 'dbi:Pg:dbname=foo',
429       user        => 'postgres',
430       password    => 'my_pg_password',
431       AutoCommit  => 1,
432       quote_char  => q{"},
433       name_sep    => q{.},
434     }]
435   );
436
437   # Subref + DBIx::Class-specific connection options
438   ->connect_info(
439     [
440       sub { DBI->connect(...) },
441       {
442           quote_char => q{`},
443           name_sep => q{@},
444           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
445           disable_sth_caching => 1,
446       },
447     ]
448   );
449
450
451
452 =cut
453
454 sub connect_info {
455   my ($self, $info) = @_;
456
457   return $self->_connect_info if !$info;
458
459   $self->_connect_info($info); # copy for _connect_info
460
461   $info = $self->_normalize_connect_info($info)
462     if ref $info eq 'ARRAY';
463
464   for my $storage_opt (keys %{ $info->{storage_options} }) {
465     my $value = $info->{storage_options}{$storage_opt};
466
467     $self->$storage_opt($value);
468   }
469
470   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
471   #  the new set of options
472   $self->_sql_maker(undef);
473   $self->_sql_maker_opts({});
474
475   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
476     my $value = $info->{sql_maker_options}{$sql_maker_opt};
477
478     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
479   }
480
481   my %attrs = (
482     %{ $self->_default_dbi_connect_attributes || {} },
483     %{ $info->{attributes} || {} },
484   );
485
486   my @args = @{ $info->{arguments} };
487
488   $self->_dbi_connect_info([@args,
489     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
490
491   return $self->_connect_info;
492 }
493
494 sub _normalize_connect_info {
495   my ($self, $info_arg) = @_;
496   my %info;
497
498   my @args = @$info_arg;  # take a shallow copy for further mutilation
499
500   # combine/pre-parse arguments depending on invocation style
501
502   my %attrs;
503   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
504     %attrs = %{ $args[1] || {} };
505     @args = $args[0];
506   }
507   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
508     %attrs = %{$args[0]};
509     @args = ();
510     if (my $code = delete $attrs{dbh_maker}) {
511       @args = $code;
512
513       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
514       if (@ignored) {
515         carp sprintf (
516             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
517           . "to the result of 'dbh_maker'",
518
519           join (', ', map { "'$_'" } (@ignored) ),
520         );
521       }
522     }
523     else {
524       @args = delete @attrs{qw/dsn user password/};
525     }
526   }
527   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
528     %attrs = (
529       % { $args[3] || {} },
530       % { $args[4] || {} },
531     );
532     @args = @args[0,1,2];
533   }
534
535   $info{arguments} = \@args;
536
537   my @storage_opts = grep exists $attrs{$_},
538     @storage_options, 'cursor_class';
539
540   @{ $info{storage_options} }{@storage_opts} =
541     delete @attrs{@storage_opts} if @storage_opts;
542
543   my @sql_maker_opts = grep exists $attrs{$_},
544     qw/limit_dialect quote_char name_sep/;
545
546   @{ $info{sql_maker_options} }{@sql_maker_opts} =
547     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
548
549   $info{attributes} = \%attrs if %attrs;
550
551   return \%info;
552 }
553
554 sub _default_dbi_connect_attributes {
555   return {
556     AutoCommit => 1,
557     RaiseError => 1,
558     PrintError => 0,
559   };
560 }
561
562 =head2 on_connect_do
563
564 This method is deprecated in favour of setting via L</connect_info>.
565
566 =cut
567
568 =head2 on_disconnect_do
569
570 This method is deprecated in favour of setting via L</connect_info>.
571
572 =cut
573
574 sub _parse_connect_do {
575   my ($self, $type) = @_;
576
577   my $val = $self->$type;
578   return () if not defined $val;
579
580   my @res;
581
582   if (not ref($val)) {
583     push @res, [ 'do_sql', $val ];
584   } elsif (ref($val) eq 'CODE') {
585     push @res, $val;
586   } elsif (ref($val) eq 'ARRAY') {
587     push @res, map { [ 'do_sql', $_ ] } @$val;
588   } else {
589     $self->throw_exception("Invalid type for $type: ".ref($val));
590   }
591
592   return \@res;
593 }
594
595 =head2 dbh_do
596
597 Arguments: ($subref | $method_name), @extra_coderef_args?
598
599 Execute the given $subref or $method_name using the new exception-based
600 connection management.
601
602 The first two arguments will be the storage object that C<dbh_do> was called
603 on and a database handle to use.  Any additional arguments will be passed
604 verbatim to the called subref as arguments 2 and onwards.
605
606 Using this (instead of $self->_dbh or $self->dbh) ensures correct
607 exception handling and reconnection (or failover in future subclasses).
608
609 Your subref should have no side-effects outside of the database, as
610 there is the potential for your subref to be partially double-executed
611 if the database connection was stale/dysfunctional.
612
613 Example:
614
615   my @stuff = $schema->storage->dbh_do(
616     sub {
617       my ($storage, $dbh, @cols) = @_;
618       my $cols = join(q{, }, @cols);
619       $dbh->selectrow_array("SELECT $cols FROM foo");
620     },
621     @column_list
622   );
623
624 =cut
625
626 sub dbh_do {
627   my $self = shift;
628   my $code = shift;
629
630   my $dbh = $self->_get_dbh;
631
632   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
633       || $self->{transaction_depth};
634
635   local $self->{_in_dbh_do} = 1;
636
637   my @result;
638   my $want_array = wantarray;
639
640   eval {
641
642     if($want_array) {
643         @result = $self->$code($dbh, @_);
644     }
645     elsif(defined $want_array) {
646         $result[0] = $self->$code($dbh, @_);
647     }
648     else {
649         $self->$code($dbh, @_);
650     }
651   };
652
653   # ->connected might unset $@ - copy
654   my $exception = $@;
655   if(!$exception) { return $want_array ? @result : $result[0] }
656
657   $self->throw_exception($exception) if $self->connected;
658
659   # We were not connected - reconnect and retry, but let any
660   #  exception fall right through this time
661   carp "Retrying $code after catching disconnected exception: $exception"
662     if $ENV{DBIC_DBIRETRY_DEBUG};
663   $self->_populate_dbh;
664   $self->$code($self->_dbh, @_);
665 }
666
667 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
668 # It also informs dbh_do to bypass itself while under the direction of txn_do,
669 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
670 sub txn_do {
671   my $self = shift;
672   my $coderef = shift;
673
674   ref $coderef eq 'CODE' or $self->throw_exception
675     ('$coderef must be a CODE reference');
676
677   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
678
679   local $self->{_in_dbh_do} = 1;
680
681   my @result;
682   my $want_array = wantarray;
683
684   my $tried = 0;
685   while(1) {
686     eval {
687       $self->_get_dbh;
688
689       $self->txn_begin;
690       if($want_array) {
691           @result = $coderef->(@_);
692       }
693       elsif(defined $want_array) {
694           $result[0] = $coderef->(@_);
695       }
696       else {
697           $coderef->(@_);
698       }
699       $self->txn_commit;
700     };
701
702     # ->connected might unset $@ - copy
703     my $exception = $@;
704     if(!$exception) { return $want_array ? @result : $result[0] }
705
706     if($tried++ || $self->connected) {
707       eval { $self->txn_rollback };
708       my $rollback_exception = $@;
709       if($rollback_exception) {
710         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
711         $self->throw_exception($exception)  # propagate nested rollback
712           if $rollback_exception =~ /$exception_class/;
713
714         $self->throw_exception(
715           "Transaction aborted: ${exception}. "
716           . "Rollback failed: ${rollback_exception}"
717         );
718       }
719       $self->throw_exception($exception)
720     }
721
722     # We were not connected, and was first try - reconnect and retry
723     # via the while loop
724     carp "Retrying $coderef after catching disconnected exception: $exception"
725       if $ENV{DBIC_DBIRETRY_DEBUG};
726     $self->_populate_dbh;
727   }
728 }
729
730 =head2 disconnect
731
732 Our C<disconnect> method also performs a rollback first if the
733 database is not in C<AutoCommit> mode.
734
735 =cut
736
737 sub disconnect {
738   my ($self) = @_;
739
740   if( $self->_dbh ) {
741     my @actions;
742
743     push @actions, ( $self->on_disconnect_call || () );
744     push @actions, $self->_parse_connect_do ('on_disconnect_do');
745
746     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
747
748     $self->_dbh_rollback unless $self->_dbh_autocommit;
749
750     $self->_dbh->disconnect;
751     $self->_dbh(undef);
752     $self->{_dbh_gen}++;
753   }
754 }
755
756 =head2 with_deferred_fk_checks
757
758 =over 4
759
760 =item Arguments: C<$coderef>
761
762 =item Return Value: The return value of $coderef
763
764 =back
765
766 Storage specific method to run the code ref with FK checks deferred or
767 in MySQL's case disabled entirely.
768
769 =cut
770
771 # Storage subclasses should override this
772 sub with_deferred_fk_checks {
773   my ($self, $sub) = @_;
774   $sub->();
775 }
776
777 =head2 connected
778
779 =over
780
781 =item Arguments: none
782
783 =item Return Value: 1|0
784
785 =back
786
787 Verifies that the the current database handle is active and ready to execute
788 an SQL statement (i.e. the connection did not get stale, server is still
789 answering, etc.) This method is used internally by L</dbh>.
790
791 =cut
792
793 sub connected {
794   my $self = shift;
795   return 0 unless $self->_seems_connected;
796
797   #be on the safe side
798   local $self->_dbh->{RaiseError} = 1;
799
800   return $self->_ping;
801 }
802
803 sub _seems_connected {
804   my $self = shift;
805
806   my $dbh = $self->_dbh
807     or return 0;
808
809   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
810     $self->_dbh(undef);
811     $self->{_dbh_gen}++;
812     return 0;
813   }
814   else {
815     $self->_verify_pid;
816     return 0 if !$self->_dbh;
817   }
818
819   return $dbh->FETCH('Active');
820 }
821
822 sub _ping {
823   my $self = shift;
824
825   my $dbh = $self->_dbh or return 0;
826
827   return $dbh->ping;
828 }
829
830 # handle pid changes correctly
831 #  NOTE: assumes $self->_dbh is a valid $dbh
832 sub _verify_pid {
833   my ($self) = @_;
834
835   return if defined $self->_conn_pid && $self->_conn_pid == $$;
836
837   $self->_dbh->{InactiveDestroy} = 1;
838   $self->_dbh(undef);
839   $self->{_dbh_gen}++;
840
841   return;
842 }
843
844 sub ensure_connected {
845   my ($self) = @_;
846
847   unless ($self->connected) {
848     $self->_populate_dbh;
849   }
850 }
851
852 =head2 dbh
853
854 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
855 is guaranteed to be healthy by implicitly calling L</connected>, and if
856 necessary performing a reconnection before returning. Keep in mind that this
857 is very B<expensive> on some database engines. Consider using L<dbh_do>
858 instead.
859
860 =cut
861
862 sub dbh {
863   my ($self) = @_;
864
865   if (not $self->_dbh) {
866     $self->_populate_dbh;
867   } else {
868     $self->ensure_connected;
869   }
870   return $self->_dbh;
871 }
872
873 # this is the internal "get dbh or connect (don't check)" method
874 sub _get_dbh {
875   my $self = shift;
876   $self->_verify_pid if $self->_dbh;
877   $self->_populate_dbh unless $self->_dbh;
878   return $self->_dbh;
879 }
880
881 sub _sql_maker_args {
882     my ($self) = @_;
883
884     return (
885       bindtype=>'columns',
886       array_datatypes => 1,
887       limit_dialect => $self->_get_dbh,
888       %{$self->_sql_maker_opts}
889     );
890 }
891
892 sub sql_maker {
893   my ($self) = @_;
894   unless ($self->_sql_maker) {
895     my $sql_maker_class = $self->sql_maker_class;
896     $self->ensure_class_loaded ($sql_maker_class);
897     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
898   }
899   return $self->_sql_maker;
900 }
901
902 # nothing to do by default
903 sub _rebless {}
904 sub _init {}
905
906 sub _populate_dbh {
907   my ($self) = @_;
908
909   my @info = @{$self->_dbi_connect_info || []};
910   $self->_dbh(undef); # in case ->connected failed we might get sent here
911   $self->_dbh($self->_connect(@info));
912
913   $self->_conn_pid($$);
914   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
915
916   $self->_determine_driver;
917
918   # Always set the transaction depth on connect, since
919   #  there is no transaction in progress by definition
920   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
921
922   $self->_run_connection_actions unless $self->{_in_determine_driver};
923 }
924
925 sub _run_connection_actions {
926   my $self = shift;
927   my @actions;
928
929   push @actions, ( $self->on_connect_call || () );
930   push @actions, $self->_parse_connect_do ('on_connect_do');
931
932   $self->_do_connection_actions(connect_call_ => $_) for @actions;
933 }
934
935 sub _determine_driver {
936   my ($self) = @_;
937
938   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
939     my $started_connected = 0;
940     local $self->{_in_determine_driver} = 1;
941
942     if (ref($self) eq __PACKAGE__) {
943       my $driver;
944       if ($self->_dbh) { # we are connected
945         $driver = $self->_dbh->{Driver}{Name};
946         $started_connected = 1;
947       } else {
948         # if connect_info is a CODEREF, we have no choice but to connect
949         if (ref $self->_dbi_connect_info->[0] &&
950             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
951           $self->_populate_dbh;
952           $driver = $self->_dbh->{Driver}{Name};
953         }
954         else {
955           # try to use dsn to not require being connected, the driver may still
956           # force a connection in _rebless to determine version
957           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
958         }
959       }
960
961       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
962       if ($self->load_optional_class($storage_class)) {
963         mro::set_mro($storage_class, 'c3');
964         bless $self, $storage_class;
965         $self->_rebless();
966       }
967     }
968
969     $self->_driver_determined(1);
970
971     $self->_init; # run driver-specific initializations
972
973     $self->_run_connection_actions
974         if !$started_connected && defined $self->_dbh;
975   }
976 }
977
978 sub _do_connection_actions {
979   my $self          = shift;
980   my $method_prefix = shift;
981   my $call          = shift;
982
983   if (not ref($call)) {
984     my $method = $method_prefix . $call;
985     $self->$method(@_);
986   } elsif (ref($call) eq 'CODE') {
987     $self->$call(@_);
988   } elsif (ref($call) eq 'ARRAY') {
989     if (ref($call->[0]) ne 'ARRAY') {
990       $self->_do_connection_actions($method_prefix, $_) for @$call;
991     } else {
992       $self->_do_connection_actions($method_prefix, @$_) for @$call;
993     }
994   } else {
995     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
996   }
997
998   return $self;
999 }
1000
1001 sub connect_call_do_sql {
1002   my $self = shift;
1003   $self->_do_query(@_);
1004 }
1005
1006 sub disconnect_call_do_sql {
1007   my $self = shift;
1008   $self->_do_query(@_);
1009 }
1010
1011 # override in db-specific backend when necessary
1012 sub connect_call_datetime_setup { 1 }
1013
1014 sub _do_query {
1015   my ($self, $action) = @_;
1016
1017   if (ref $action eq 'CODE') {
1018     $action = $action->($self);
1019     $self->_do_query($_) foreach @$action;
1020   }
1021   else {
1022     # Most debuggers expect ($sql, @bind), so we need to exclude
1023     # the attribute hash which is the second argument to $dbh->do
1024     # furthermore the bind values are usually to be presented
1025     # as named arrayref pairs, so wrap those here too
1026     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1027     my $sql = shift @do_args;
1028     my $attrs = shift @do_args;
1029     my @bind = map { [ undef, $_ ] } @do_args;
1030
1031     $self->_query_start($sql, @bind);
1032     $self->_get_dbh->do($sql, $attrs, @do_args);
1033     $self->_query_end($sql, @bind);
1034   }
1035
1036   return $self;
1037 }
1038
1039 sub _connect {
1040   my ($self, @info) = @_;
1041
1042   $self->throw_exception("You failed to provide any connection info")
1043     if !@info;
1044
1045   my ($old_connect_via, $dbh);
1046
1047   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1048     $old_connect_via = $DBI::connect_via;
1049     $DBI::connect_via = 'connect';
1050   }
1051
1052   eval {
1053     if(ref $info[0] eq 'CODE') {
1054        $dbh = $info[0]->();
1055     }
1056     else {
1057        $dbh = DBI->connect(@info);
1058     }
1059
1060     if($dbh && !$self->unsafe) {
1061       my $weak_self = $self;
1062       Scalar::Util::weaken($weak_self);
1063       $dbh->{HandleError} = sub {
1064           if ($weak_self) {
1065             $weak_self->throw_exception("DBI Exception: $_[0]");
1066           }
1067           else {
1068             # the handler may be invoked by something totally out of
1069             # the scope of DBIC
1070             croak ("DBI Exception: $_[0]");
1071           }
1072       };
1073       $dbh->{ShowErrorStatement} = 1;
1074       $dbh->{RaiseError} = 1;
1075       $dbh->{PrintError} = 0;
1076     }
1077   };
1078
1079   $DBI::connect_via = $old_connect_via if $old_connect_via;
1080
1081   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1082     if !$dbh || $@;
1083
1084   $self->_dbh_autocommit($dbh->{AutoCommit});
1085
1086   $dbh;
1087 }
1088
1089 sub svp_begin {
1090   my ($self, $name) = @_;
1091
1092   $name = $self->_svp_generate_name
1093     unless defined $name;
1094
1095   $self->throw_exception ("You can't use savepoints outside a transaction")
1096     if $self->{transaction_depth} == 0;
1097
1098   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1099     unless $self->can('_svp_begin');
1100
1101   push @{ $self->{savepoints} }, $name;
1102
1103   $self->debugobj->svp_begin($name) if $self->debug;
1104
1105   return $self->_svp_begin($name);
1106 }
1107
1108 sub svp_release {
1109   my ($self, $name) = @_;
1110
1111   $self->throw_exception ("You can't use savepoints outside a transaction")
1112     if $self->{transaction_depth} == 0;
1113
1114   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1115     unless $self->can('_svp_release');
1116
1117   if (defined $name) {
1118     $self->throw_exception ("Savepoint '$name' does not exist")
1119       unless grep { $_ eq $name } @{ $self->{savepoints} };
1120
1121     # Dig through the stack until we find the one we are releasing.  This keeps
1122     # the stack up to date.
1123     my $svp;
1124
1125     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1126   } else {
1127     $name = pop @{ $self->{savepoints} };
1128   }
1129
1130   $self->debugobj->svp_release($name) if $self->debug;
1131
1132   return $self->_svp_release($name);
1133 }
1134
1135 sub svp_rollback {
1136   my ($self, $name) = @_;
1137
1138   $self->throw_exception ("You can't use savepoints outside a transaction")
1139     if $self->{transaction_depth} == 0;
1140
1141   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1142     unless $self->can('_svp_rollback');
1143
1144   if (defined $name) {
1145       # If they passed us a name, verify that it exists in the stack
1146       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1147           $self->throw_exception("Savepoint '$name' does not exist!");
1148       }
1149
1150       # Dig through the stack until we find the one we are releasing.  This keeps
1151       # the stack up to date.
1152       while(my $s = pop(@{ $self->{savepoints} })) {
1153           last if($s eq $name);
1154       }
1155       # Add the savepoint back to the stack, as a rollback doesn't remove the
1156       # named savepoint, only everything after it.
1157       push(@{ $self->{savepoints} }, $name);
1158   } else {
1159       # We'll assume they want to rollback to the last savepoint
1160       $name = $self->{savepoints}->[-1];
1161   }
1162
1163   $self->debugobj->svp_rollback($name) if $self->debug;
1164
1165   return $self->_svp_rollback($name);
1166 }
1167
1168 sub _svp_generate_name {
1169     my ($self) = @_;
1170
1171     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1172 }
1173
1174 sub txn_begin {
1175   my $self = shift;
1176
1177   # this means we have not yet connected and do not know the AC status
1178   # (e.g. coderef $dbh)
1179   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1180
1181   if($self->{transaction_depth} == 0) {
1182     $self->debugobj->txn_begin()
1183       if $self->debug;
1184     $self->_dbh_begin_work;
1185   }
1186   elsif ($self->auto_savepoint) {
1187     $self->svp_begin;
1188   }
1189   $self->{transaction_depth}++;
1190 }
1191
1192 sub _dbh_begin_work {
1193   my $self = shift;
1194
1195   # if the user is utilizing txn_do - good for him, otherwise we need to
1196   # ensure that the $dbh is healthy on BEGIN.
1197   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1198   # will be replaced by a failure of begin_work itself (which will be
1199   # then retried on reconnect)
1200   if ($self->{_in_dbh_do}) {
1201     $self->_dbh->begin_work;
1202   } else {
1203     $self->dbh_do(sub { $_[1]->begin_work });
1204   }
1205 }
1206
1207 sub txn_commit {
1208   my $self = shift;
1209   if ($self->{transaction_depth} == 1) {
1210     $self->debugobj->txn_commit()
1211       if ($self->debug);
1212     $self->_dbh_commit;
1213     $self->{transaction_depth} = 0
1214       if $self->_dbh_autocommit;
1215   }
1216   elsif($self->{transaction_depth} > 1) {
1217     $self->{transaction_depth}--;
1218     $self->svp_release
1219       if $self->auto_savepoint;
1220   }
1221 }
1222
1223 sub _dbh_commit {
1224   my $self = shift;
1225   my $dbh  = $self->_dbh
1226     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1227   $dbh->commit;
1228 }
1229
1230 sub txn_rollback {
1231   my $self = shift;
1232   my $dbh = $self->_dbh;
1233   eval {
1234     if ($self->{transaction_depth} == 1) {
1235       $self->debugobj->txn_rollback()
1236         if ($self->debug);
1237       $self->{transaction_depth} = 0
1238         if $self->_dbh_autocommit;
1239       $self->_dbh_rollback;
1240     }
1241     elsif($self->{transaction_depth} > 1) {
1242       $self->{transaction_depth}--;
1243       if ($self->auto_savepoint) {
1244         $self->svp_rollback;
1245         $self->svp_release;
1246       }
1247     }
1248     else {
1249       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1250     }
1251   };
1252   if ($@) {
1253     my $error = $@;
1254     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1255     $error =~ /$exception_class/ and $self->throw_exception($error);
1256     # ensure that a failed rollback resets the transaction depth
1257     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1258     $self->throw_exception($error);
1259   }
1260 }
1261
1262 sub _dbh_rollback {
1263   my $self = shift;
1264   my $dbh  = $self->_dbh
1265     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1266   $dbh->rollback;
1267 }
1268
1269 # This used to be the top-half of _execute.  It was split out to make it
1270 #  easier to override in NoBindVars without duping the rest.  It takes up
1271 #  all of _execute's args, and emits $sql, @bind.
1272 sub _prep_for_execute {
1273   my ($self, $op, $extra_bind, $ident, $args) = @_;
1274
1275   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1276     $ident = $ident->from();
1277   }
1278
1279   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1280
1281   unshift(@bind,
1282     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1283       if $extra_bind;
1284   return ($sql, \@bind);
1285 }
1286
1287
1288 sub _fix_bind_params {
1289     my ($self, @bind) = @_;
1290
1291     ### Turn @bind from something like this:
1292     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1293     ### to this:
1294     ###   ( "'1'", "'1'", "'3'" )
1295     return
1296         map {
1297             if ( defined( $_ && $_->[1] ) ) {
1298                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1299             }
1300             else { q{'NULL'}; }
1301         } @bind;
1302 }
1303
1304 sub _query_start {
1305     my ( $self, $sql, @bind ) = @_;
1306
1307     if ( $self->debug ) {
1308         @bind = $self->_fix_bind_params(@bind);
1309
1310         $self->debugobj->query_start( $sql, @bind );
1311     }
1312 }
1313
1314 sub _query_end {
1315     my ( $self, $sql, @bind ) = @_;
1316
1317     if ( $self->debug ) {
1318         @bind = $self->_fix_bind_params(@bind);
1319         $self->debugobj->query_end( $sql, @bind );
1320     }
1321 }
1322
1323 sub _dbh_execute {
1324   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1325
1326   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1327
1328   $self->_query_start( $sql, @$bind );
1329
1330   my $sth = $self->sth($sql,$op);
1331
1332   my $placeholder_index = 1;
1333
1334   foreach my $bound (@$bind) {
1335     my $attributes = {};
1336     my($column_name, @data) = @$bound;
1337
1338     if ($bind_attributes) {
1339       $attributes = $bind_attributes->{$column_name}
1340       if defined $bind_attributes->{$column_name};
1341     }
1342
1343     foreach my $data (@data) {
1344       my $ref = ref $data;
1345       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1346
1347       $sth->bind_param($placeholder_index, $data, $attributes);
1348       $placeholder_index++;
1349     }
1350   }
1351
1352   # Can this fail without throwing an exception anyways???
1353   my $rv = $sth->execute();
1354   $self->throw_exception($sth->errstr) if !$rv;
1355
1356   $self->_query_end( $sql, @$bind );
1357
1358   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1359 }
1360
1361 sub _execute {
1362     my $self = shift;
1363     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1364 }
1365
1366 sub insert {
1367   my ($self, $source, $to_insert) = @_;
1368
1369   my $ident = $source->from;
1370   my $bind_attributes = $self->source_bind_attributes($source);
1371
1372   my $updated_cols = {};
1373
1374   foreach my $col ( $source->columns ) {
1375     if ( !defined $to_insert->{$col} ) {
1376       my $col_info = $source->column_info($col);
1377
1378       if ( $col_info->{auto_nextval} ) {
1379         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1380           'nextval',
1381           $col_info->{sequence} ||
1382             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1383         );
1384       }
1385     }
1386   }
1387
1388   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1389
1390   return $updated_cols;
1391 }
1392
1393 ## Currently it is assumed that all values passed will be "normal", i.e. not
1394 ## scalar refs, or at least, all the same type as the first set, the statement is
1395 ## only prepped once.
1396 sub insert_bulk {
1397   my ($self, $source, $cols, $data) = @_;
1398
1399   my %colvalues;
1400   @colvalues{@$cols} = (0..$#$cols);
1401
1402   for my $i (0..$#$cols) {
1403     my $first_val = $data->[0][$i];
1404     next unless ref $first_val eq 'SCALAR';
1405
1406     $colvalues{ $cols->[$i] } = $first_val;
1407   }
1408
1409   # check for bad data and stringify stringifiable objects
1410   my $bad_slice = sub {
1411     my ($msg, $col_idx, $slice_idx) = @_;
1412     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1413       $msg,
1414       $cols->[$col_idx],
1415       do {
1416         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1417         Data::Dumper::Concise::Dumper({
1418           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1419         }),
1420       }
1421     );
1422   };
1423
1424   for my $datum_idx (0..$#$data) {
1425     my $datum = $data->[$datum_idx];
1426
1427     for my $col_idx (0..$#$cols) {
1428       my $val            = $datum->[$col_idx];
1429       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1430       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1431
1432       if ($is_literal_sql) {
1433         if (not ref $val) {
1434           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1435         }
1436         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1437           $bad_slice->("$reftype reference found where literal SQL expected",
1438             $col_idx, $datum_idx);
1439         }
1440         elsif ($$val ne $$sqla_bind){
1441           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1442             $col_idx, $datum_idx);
1443         }
1444       }
1445       elsif (my $reftype = ref $val) {
1446         require overload;
1447         if (overload::Method($val, '""')) {
1448           $datum->[$col_idx] = "".$val;
1449         }
1450         else {
1451           $bad_slice->("$reftype reference found where bind expected",
1452             $col_idx, $datum_idx);
1453         }
1454       }
1455     }
1456   }
1457
1458   my ($sql, $bind) = $self->_prep_for_execute (
1459     'insert', undef, $source, [\%colvalues]
1460   );
1461   my @bind = @$bind;
1462
1463   my $empty_bind = 1 if (not @bind) &&
1464     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1465
1466   if ((not @bind) && (not $empty_bind)) {
1467     $self->throw_exception(
1468       'Cannot insert_bulk without support for placeholders'
1469     );
1470   }
1471
1472   # neither _execute_array, nor _execute_inserts_with_no_binds are
1473   # atomic (even if _execute _array is a single call). Thus a safety
1474   # scope guard
1475   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1476
1477   $self->_query_start( $sql, ['__BULK__'] );
1478   my $sth = $self->sth($sql);
1479   my $rv = do {
1480     if ($empty_bind) {
1481       # bind_param_array doesn't work if there are no binds
1482       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1483     }
1484     else {
1485 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1486       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1487     }
1488   };
1489
1490   $self->_query_end( $sql, ['__BULK__'] );
1491
1492
1493   $guard->commit if $guard;
1494
1495   return (wantarray ? ($rv, $sth, @bind) : $rv);
1496 }
1497
1498 sub _execute_array {
1499   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1500
1501   ## This must be an arrayref, else nothing works!
1502   my $tuple_status = [];
1503
1504   ## Get the bind_attributes, if any exist
1505   my $bind_attributes = $self->source_bind_attributes($source);
1506
1507   ## Bind the values and execute
1508   my $placeholder_index = 1;
1509
1510   foreach my $bound (@$bind) {
1511
1512     my $attributes = {};
1513     my ($column_name, $data_index) = @$bound;
1514
1515     if( $bind_attributes ) {
1516       $attributes = $bind_attributes->{$column_name}
1517       if defined $bind_attributes->{$column_name};
1518     }
1519
1520     my @data = map { $_->[$data_index] } @$data;
1521
1522     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1523     $placeholder_index++;
1524   }
1525
1526   my $rv = eval {
1527     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1528   };
1529   my $err = $@ || $sth->errstr;
1530
1531 # Statement must finish even if there was an exception.
1532   eval { $sth->finish };
1533   $err = $@ unless $err;
1534
1535   if ($err) {
1536     my $i = 0;
1537     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1538
1539     $self->throw_exception("Unexpected populate error: $err")
1540       if ($i > $#$tuple_status);
1541
1542     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1543       ($tuple_status->[$i][1] || $err),
1544       Data::Dumper::Concise::Dumper({
1545         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1546       }),
1547     );
1548   }
1549   return $rv;
1550 }
1551
1552 sub _dbh_execute_array {
1553     my ($self, $sth, $tuple_status, @extra) = @_;
1554
1555     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1556 }
1557
1558 sub _dbh_execute_inserts_with_no_binds {
1559   my ($self, $sth, $count) = @_;
1560
1561   eval {
1562     my $dbh = $self->_get_dbh;
1563     local $dbh->{RaiseError} = 1;
1564     local $dbh->{PrintError} = 0;
1565
1566     $sth->execute foreach 1..$count;
1567   };
1568   my $exception = $@;
1569
1570 # Make sure statement is finished even if there was an exception.
1571   eval { $sth->finish };
1572   $exception = $@ unless $exception;
1573
1574   $self->throw_exception($exception) if $exception;
1575
1576   return $count;
1577 }
1578
1579 sub update {
1580   my ($self, $source, @args) = @_;
1581
1582   my $bind_attrs = $self->source_bind_attributes($source);
1583
1584   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1585 }
1586
1587
1588 sub delete {
1589   my ($self, $source, @args) = @_;
1590
1591   my $bind_attrs = $self->source_bind_attributes($source);
1592
1593   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1594 }
1595
1596 # We were sent here because the $rs contains a complex search
1597 # which will require a subquery to select the correct rows
1598 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1599 #
1600 # Generating a single PK column subquery is trivial and supported
1601 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1602 # Look at _multipk_update_delete()
1603 sub _subq_update_delete {
1604   my $self = shift;
1605   my ($rs, $op, $values) = @_;
1606
1607   my $rsrc = $rs->result_source;
1608
1609   # quick check if we got a sane rs on our hands
1610   my @pcols = $rsrc->primary_columns;
1611   unless (@pcols) {
1612     $self->throw_exception (
1613       sprintf (
1614         "You must declare primary key(s) on source '%s' (via set_primary_key) in order to update or delete complex resultsets",
1615         $rsrc->source_name || $rsrc->from
1616       )
1617     );
1618   }
1619
1620   my $sel = $rs->_resolved_attrs->{select};
1621   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1622
1623   if (
1624       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1625         ne
1626       join ("\x00", sort @$sel )
1627   ) {
1628     $self->throw_exception (
1629       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1630     );
1631   }
1632
1633   if (@pcols == 1) {
1634     return $self->$op (
1635       $rsrc,
1636       $op eq 'update' ? $values : (),
1637       { $pcols[0] => { -in => $rs->as_query } },
1638     );
1639   }
1640
1641   else {
1642     return $self->_multipk_update_delete (@_);
1643   }
1644 }
1645
1646 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1647 # resultset update/delete involving subqueries. So by default resort
1648 # to simple (and inefficient) delete_all style per-row opearations,
1649 # while allowing specific storages to override this with a faster
1650 # implementation.
1651 #
1652 sub _multipk_update_delete {
1653   return shift->_per_row_update_delete (@_);
1654 }
1655
1656 # This is the default loop used to delete/update rows for multi PK
1657 # resultsets, and used by mysql exclusively (because it can't do anything
1658 # else).
1659 #
1660 # We do not use $row->$op style queries, because resultset update/delete
1661 # is not expected to cascade (this is what delete_all/update_all is for).
1662 #
1663 # There should be no race conditions as the entire operation is rolled
1664 # in a transaction.
1665 #
1666 sub _per_row_update_delete {
1667   my $self = shift;
1668   my ($rs, $op, $values) = @_;
1669
1670   my $rsrc = $rs->result_source;
1671   my @pcols = $rsrc->primary_columns;
1672
1673   my $guard = $self->txn_scope_guard;
1674
1675   # emulate the return value of $sth->execute for non-selects
1676   my $row_cnt = '0E0';
1677
1678   my $subrs_cur = $rs->cursor;
1679   my @all_pk = $subrs_cur->all;
1680   for my $pks ( @all_pk) {
1681
1682     my $cond;
1683     for my $i (0.. $#pcols) {
1684       $cond->{$pcols[$i]} = $pks->[$i];
1685     }
1686
1687     $self->$op (
1688       $rsrc,
1689       $op eq 'update' ? $values : (),
1690       $cond,
1691     );
1692
1693     $row_cnt++;
1694   }
1695
1696   $guard->commit;
1697
1698   return $row_cnt;
1699 }
1700
1701 sub _select {
1702   my $self = shift;
1703
1704   # localization is neccessary as
1705   # 1) there is no infrastructure to pass this around before SQLA2
1706   # 2) _select_args sets it and _prep_for_execute consumes it
1707   my $sql_maker = $self->sql_maker;
1708   local $sql_maker->{_dbic_rs_attrs};
1709
1710   return $self->_execute($self->_select_args(@_));
1711 }
1712
1713 sub _select_args_to_query {
1714   my $self = shift;
1715
1716   # localization is neccessary as
1717   # 1) there is no infrastructure to pass this around before SQLA2
1718   # 2) _select_args sets it and _prep_for_execute consumes it
1719   my $sql_maker = $self->sql_maker;
1720   local $sql_maker->{_dbic_rs_attrs};
1721
1722   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1723   #  = $self->_select_args($ident, $select, $cond, $attrs);
1724   my ($op, $bind, $ident, $bind_attrs, @args) =
1725     $self->_select_args(@_);
1726
1727   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1728   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1729   $prepared_bind ||= [];
1730
1731   return wantarray
1732     ? ($sql, $prepared_bind, $bind_attrs)
1733     : \[ "($sql)", @$prepared_bind ]
1734   ;
1735 }
1736
1737 sub _select_args {
1738   my ($self, $ident, $select, $where, $attrs) = @_;
1739
1740   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1741
1742   my $sql_maker = $self->sql_maker;
1743   $sql_maker->{_dbic_rs_attrs} = {
1744     %$attrs,
1745     select => $select,
1746     from => $ident,
1747     where => $where,
1748     $rs_alias && $alias2source->{$rs_alias}
1749       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1750       : ()
1751     ,
1752   };
1753
1754   # calculate bind_attrs before possible $ident mangling
1755   my $bind_attrs = {};
1756   for my $alias (keys %$alias2source) {
1757     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1758     for my $col (keys %$bindtypes) {
1759
1760       my $fqcn = join ('.', $alias, $col);
1761       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1762
1763       # Unqialified column names are nice, but at the same time can be
1764       # rather ambiguous. What we do here is basically go along with
1765       # the loop, adding an unqualified column slot to $bind_attrs,
1766       # alongside the fully qualified name. As soon as we encounter
1767       # another column by that name (which would imply another table)
1768       # we unset the unqualified slot and never add any info to it
1769       # to avoid erroneous type binding. If this happens the users
1770       # only choice will be to fully qualify his column name
1771
1772       if (exists $bind_attrs->{$col}) {
1773         $bind_attrs->{$col} = {};
1774       }
1775       else {
1776         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1777       }
1778     }
1779   }
1780
1781   # adjust limits
1782   if (
1783     $attrs->{software_limit}
1784       ||
1785     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1786   ) {
1787     $attrs->{software_limit} = 1;
1788   }
1789   else {
1790     $self->throw_exception("rows attribute must be positive if present")
1791       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1792
1793     # MySQL actually recommends this approach.  I cringe.
1794     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1795   }
1796
1797   my @limit;
1798
1799   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1800   # storage, unless software limit was requested
1801   if (
1802     #limited has_many
1803     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1804        ||
1805     # limited prefetch with RNO subqueries
1806     (
1807       $attrs->{rows}
1808         &&
1809       $sql_maker->limit_dialect eq 'RowNumberOver'
1810         &&
1811       $attrs->{_prefetch_select}
1812         &&
1813       @{$attrs->{_prefetch_select}}
1814     )
1815       ||
1816     # grouped prefetch
1817     ( $attrs->{group_by}
1818         &&
1819       @{$attrs->{group_by}}
1820         &&
1821       $attrs->{_prefetch_select}
1822         &&
1823       @{$attrs->{_prefetch_select}}
1824     )
1825   ) {
1826     ($ident, $select, $where, $attrs)
1827       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1828   }
1829
1830   elsif (
1831     ($attrs->{rows} || $attrs->{offset})
1832       &&
1833     $sql_maker->limit_dialect eq 'RowNumberOver'
1834       &&
1835     (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
1836       &&
1837     scalar $self->_parse_order_by ($attrs->{order_by})
1838   ) {
1839     # the RNO limit dialect above mangles the SQL such that the join gets lost
1840     # wrap a subquery here
1841
1842     push @limit, delete @{$attrs}{qw/rows offset/};
1843
1844     my $subq = $self->_select_args_to_query (
1845       $ident,
1846       $select,
1847       $where,
1848       $attrs,
1849     );
1850
1851     $ident = {
1852       -alias => $attrs->{alias},
1853       -source_handle => $ident->[0]{-source_handle},
1854       $attrs->{alias} => $subq,
1855     };
1856
1857     # all part of the subquery now
1858     delete @{$attrs}{qw/order_by group_by having/};
1859     $where = undef;
1860   }
1861
1862   elsif (! $attrs->{software_limit} ) {
1863     push @limit, $attrs->{rows}, $attrs->{offset};
1864   }
1865
1866   # try to simplify the joinmap further (prune unreferenced type-single joins)
1867   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1868
1869 ###
1870   # This would be the point to deflate anything found in $where
1871   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1872   # expect a row object. And all we have is a resultsource (it is trivial
1873   # to extract deflator coderefs via $alias2source above).
1874   #
1875   # I don't see a way forward other than changing the way deflators are
1876   # invoked, and that's just bad...
1877 ###
1878
1879   my $order = { map
1880     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1881     (qw/order_by group_by having/ )
1882   };
1883
1884   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1885 }
1886
1887 # Returns a counting SELECT for a simple count
1888 # query. Abstracted so that a storage could override
1889 # this to { count => 'firstcol' } or whatever makes
1890 # sense as a performance optimization
1891 sub _count_select {
1892   #my ($self, $source, $rs_attrs) = @_;
1893   return { count => '*' };
1894 }
1895
1896 # Returns a SELECT which will end up in the subselect
1897 # There may or may not be a group_by, as the subquery
1898 # might have been called to accomodate a limit
1899 #
1900 # Most databases would be happy with whatever ends up
1901 # here, but some choke in various ways.
1902 #
1903 sub _subq_count_select {
1904   my ($self, $source, $rs_attrs) = @_;
1905   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1906
1907   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1908   return @pcols ? \@pcols : [ 1 ];
1909 }
1910
1911 sub source_bind_attributes {
1912   my ($self, $source) = @_;
1913
1914   my $bind_attributes;
1915   foreach my $column ($source->columns) {
1916
1917     my $data_type = $source->column_info($column)->{data_type} || '';
1918     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1919      if $data_type;
1920   }
1921
1922   return $bind_attributes;
1923 }
1924
1925 =head2 select
1926
1927 =over 4
1928
1929 =item Arguments: $ident, $select, $condition, $attrs
1930
1931 =back
1932
1933 Handle a SQL select statement.
1934
1935 =cut
1936
1937 sub select {
1938   my $self = shift;
1939   my ($ident, $select, $condition, $attrs) = @_;
1940   return $self->cursor_class->new($self, \@_, $attrs);
1941 }
1942
1943 sub select_single {
1944   my $self = shift;
1945   my ($rv, $sth, @bind) = $self->_select(@_);
1946   my @row = $sth->fetchrow_array;
1947   my @nextrow = $sth->fetchrow_array if @row;
1948   if(@row && @nextrow) {
1949     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1950   }
1951   # Need to call finish() to work round broken DBDs
1952   $sth->finish();
1953   return @row;
1954 }
1955
1956 =head2 sth
1957
1958 =over 4
1959
1960 =item Arguments: $sql
1961
1962 =back
1963
1964 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1965
1966 =cut
1967
1968 sub _dbh_sth {
1969   my ($self, $dbh, $sql) = @_;
1970
1971   # 3 is the if_active parameter which avoids active sth re-use
1972   my $sth = $self->disable_sth_caching
1973     ? $dbh->prepare($sql)
1974     : $dbh->prepare_cached($sql, {}, 3);
1975
1976   # XXX You would think RaiseError would make this impossible,
1977   #  but apparently that's not true :(
1978   $self->throw_exception($dbh->errstr) if !$sth;
1979
1980   $sth;
1981 }
1982
1983 sub sth {
1984   my ($self, $sql) = @_;
1985   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
1986 }
1987
1988 sub _dbh_columns_info_for {
1989   my ($self, $dbh, $table) = @_;
1990
1991   if ($dbh->can('column_info')) {
1992     my %result;
1993     eval {
1994       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1995       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1996       $sth->execute();
1997       while ( my $info = $sth->fetchrow_hashref() ){
1998         my %column_info;
1999         $column_info{data_type}   = $info->{TYPE_NAME};
2000         $column_info{size}      = $info->{COLUMN_SIZE};
2001         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2002         $column_info{default_value} = $info->{COLUMN_DEF};
2003         my $col_name = $info->{COLUMN_NAME};
2004         $col_name =~ s/^\"(.*)\"$/$1/;
2005
2006         $result{$col_name} = \%column_info;
2007       }
2008     };
2009     return \%result if !$@ && scalar keys %result;
2010   }
2011
2012   my %result;
2013   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2014   $sth->execute;
2015   my @columns = @{$sth->{NAME_lc}};
2016   for my $i ( 0 .. $#columns ){
2017     my %column_info;
2018     $column_info{data_type} = $sth->{TYPE}->[$i];
2019     $column_info{size} = $sth->{PRECISION}->[$i];
2020     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2021
2022     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2023       $column_info{data_type} = $1;
2024       $column_info{size}    = $2;
2025     }
2026
2027     $result{$columns[$i]} = \%column_info;
2028   }
2029   $sth->finish;
2030
2031   foreach my $col (keys %result) {
2032     my $colinfo = $result{$col};
2033     my $type_num = $colinfo->{data_type};
2034     my $type_name;
2035     if(defined $type_num && $dbh->can('type_info')) {
2036       my $type_info = $dbh->type_info($type_num);
2037       $type_name = $type_info->{TYPE_NAME} if $type_info;
2038       $colinfo->{data_type} = $type_name if $type_name;
2039     }
2040   }
2041
2042   return \%result;
2043 }
2044
2045 sub columns_info_for {
2046   my ($self, $table) = @_;
2047   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2048 }
2049
2050 =head2 last_insert_id
2051
2052 Return the row id of the last insert.
2053
2054 =cut
2055
2056 sub _dbh_last_insert_id {
2057     # All Storage's need to register their own _dbh_last_insert_id
2058     # the old SQLite-based method was highly inappropriate
2059
2060     my $self = shift;
2061     my $class = ref $self;
2062     $self->throw_exception (<<EOE);
2063
2064 No _dbh_last_insert_id() method found in $class.
2065 Since the method of obtaining the autoincrement id of the last insert
2066 operation varies greatly between different databases, this method must be
2067 individually implemented for every storage class.
2068 EOE
2069 }
2070
2071 sub last_insert_id {
2072   my $self = shift;
2073   $self->_dbh_last_insert_id ($self->_dbh, @_);
2074 }
2075
2076 =head2 _native_data_type
2077
2078 =over 4
2079
2080 =item Arguments: $type_name
2081
2082 =back
2083
2084 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2085 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2086 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2087
2088 The default implementation returns C<undef>, implement in your Storage driver if
2089 you need this functionality.
2090
2091 Should map types from other databases to the native RDBMS type, for example
2092 C<VARCHAR2> to C<VARCHAR>.
2093
2094 Types with modifiers should map to the underlying data type. For example,
2095 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2096
2097 Composite types should map to the container type, for example
2098 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2099
2100 =cut
2101
2102 sub _native_data_type {
2103   #my ($self, $data_type) = @_;
2104   return undef
2105 }
2106
2107 # Check if placeholders are supported at all
2108 sub _placeholders_supported {
2109   my $self = shift;
2110   my $dbh  = $self->_get_dbh;
2111
2112   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2113   # but it is inaccurate more often than not
2114   eval {
2115     local $dbh->{PrintError} = 0;
2116     local $dbh->{RaiseError} = 1;
2117     $dbh->do('select ?', {}, 1);
2118   };
2119   return $@ ? 0 : 1;
2120 }
2121
2122 # Check if placeholders bound to non-string types throw exceptions
2123 #
2124 sub _typeless_placeholders_supported {
2125   my $self = shift;
2126   my $dbh  = $self->_get_dbh;
2127
2128   eval {
2129     local $dbh->{PrintError} = 0;
2130     local $dbh->{RaiseError} = 1;
2131     # this specifically tests a bind that is NOT a string
2132     $dbh->do('select 1 where 1 = ?', {}, 1);
2133   };
2134   return $@ ? 0 : 1;
2135 }
2136
2137 =head2 sqlt_type
2138
2139 Returns the database driver name.
2140
2141 =cut
2142
2143 sub sqlt_type {
2144   shift->_get_dbh->{Driver}->{Name};
2145 }
2146
2147 =head2 bind_attribute_by_data_type
2148
2149 Given a datatype from column info, returns a database specific bind
2150 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2151 let the database planner just handle it.
2152
2153 Generally only needed for special case column types, like bytea in postgres.
2154
2155 =cut
2156
2157 sub bind_attribute_by_data_type {
2158     return;
2159 }
2160
2161 =head2 is_datatype_numeric
2162
2163 Given a datatype from column_info, returns a boolean value indicating if
2164 the current RDBMS considers it a numeric value. This controls how
2165 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2166 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2167 be performed instead of the usual C<eq>.
2168
2169 =cut
2170
2171 sub is_datatype_numeric {
2172   my ($self, $dt) = @_;
2173
2174   return 0 unless $dt;
2175
2176   return $dt =~ /^ (?:
2177     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2178   ) $/ix;
2179 }
2180
2181
2182 =head2 create_ddl_dir
2183
2184 =over 4
2185
2186 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2187
2188 =back
2189
2190 Creates a SQL file based on the Schema, for each of the specified
2191 database engines in C<\@databases> in the given directory.
2192 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2193
2194 Given a previous version number, this will also create a file containing
2195 the ALTER TABLE statements to transform the previous schema into the
2196 current one. Note that these statements may contain C<DROP TABLE> or
2197 C<DROP COLUMN> statements that can potentially destroy data.
2198
2199 The file names are created using the C<ddl_filename> method below, please
2200 override this method in your schema if you would like a different file
2201 name format. For the ALTER file, the same format is used, replacing
2202 $version in the name with "$preversion-$version".
2203
2204 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2205 The most common value for this would be C<< { add_drop_table => 1 } >>
2206 to have the SQL produced include a C<DROP TABLE> statement for each table
2207 created. For quoting purposes supply C<quote_table_names> and
2208 C<quote_field_names>.
2209
2210 If no arguments are passed, then the following default values are assumed:
2211
2212 =over 4
2213
2214 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2215
2216 =item version    - $schema->schema_version
2217
2218 =item directory  - './'
2219
2220 =item preversion - <none>
2221
2222 =back
2223
2224 By default, C<\%sqlt_args> will have
2225
2226  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2227
2228 merged with the hash passed in. To disable any of those features, pass in a
2229 hashref like the following
2230
2231  { ignore_constraint_names => 0, # ... other options }
2232
2233
2234 WARNING: You are strongly advised to check all SQL files created, before applying
2235 them.
2236
2237 =cut
2238
2239 sub create_ddl_dir {
2240   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2241
2242   if(!$dir || !-d $dir) {
2243     carp "No directory given, using ./\n";
2244     $dir = "./";
2245   }
2246   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2247   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2248
2249   my $schema_version = $schema->schema_version || '1.x';
2250   $version ||= $schema_version;
2251
2252   $sqltargs = {
2253     add_drop_table => 1,
2254     ignore_constraint_names => 1,
2255     ignore_index_names => 1,
2256     %{$sqltargs || {}}
2257   };
2258
2259   $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
2260     if !$self->_sqlt_version_ok;
2261
2262   my $sqlt = SQL::Translator->new( $sqltargs );
2263
2264   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2265   my $sqlt_schema = $sqlt->translate({ data => $schema })
2266     or $self->throw_exception ($sqlt->error);
2267
2268   foreach my $db (@$databases) {
2269     $sqlt->reset();
2270     $sqlt->{schema} = $sqlt_schema;
2271     $sqlt->producer($db);
2272
2273     my $file;
2274     my $filename = $schema->ddl_filename($db, $version, $dir);
2275     if (-e $filename && ($version eq $schema_version )) {
2276       # if we are dumping the current version, overwrite the DDL
2277       carp "Overwriting existing DDL file - $filename";
2278       unlink($filename);
2279     }
2280
2281     my $output = $sqlt->translate;
2282     if(!$output) {
2283       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2284       next;
2285     }
2286     if(!open($file, ">$filename")) {
2287       $self->throw_exception("Can't open $filename for writing ($!)");
2288       next;
2289     }
2290     print $file $output;
2291     close($file);
2292
2293     next unless ($preversion);
2294
2295     require SQL::Translator::Diff;
2296
2297     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2298     if(!-e $prefilename) {
2299       carp("No previous schema file found ($prefilename)");
2300       next;
2301     }
2302
2303     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2304     if(-e $difffile) {
2305       carp("Overwriting existing diff file - $difffile");
2306       unlink($difffile);
2307     }
2308
2309     my $source_schema;
2310     {
2311       my $t = SQL::Translator->new($sqltargs);
2312       $t->debug( 0 );
2313       $t->trace( 0 );
2314
2315       $t->parser( $db )
2316         or $self->throw_exception ($t->error);
2317
2318       my $out = $t->translate( $prefilename )
2319         or $self->throw_exception ($t->error);
2320
2321       $source_schema = $t->schema;
2322
2323       $source_schema->name( $prefilename )
2324         unless ( $source_schema->name );
2325     }
2326
2327     # The "new" style of producers have sane normalization and can support
2328     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2329     # And we have to diff parsed SQL against parsed SQL.
2330     my $dest_schema = $sqlt_schema;
2331
2332     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2333       my $t = SQL::Translator->new($sqltargs);
2334       $t->debug( 0 );
2335       $t->trace( 0 );
2336
2337       $t->parser( $db )
2338         or $self->throw_exception ($t->error);
2339
2340       my $out = $t->translate( $filename )
2341         or $self->throw_exception ($t->error);
2342
2343       $dest_schema = $t->schema;
2344
2345       $dest_schema->name( $filename )
2346         unless $dest_schema->name;
2347     }
2348
2349     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2350                                                   $dest_schema,   $db,
2351                                                   $sqltargs
2352                                                  );
2353     if(!open $file, ">$difffile") {
2354       $self->throw_exception("Can't write to $difffile ($!)");
2355       next;
2356     }
2357     print $file $diff;
2358     close($file);
2359   }
2360 }
2361
2362 =head2 deployment_statements
2363
2364 =over 4
2365
2366 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2367
2368 =back
2369
2370 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2371
2372 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2373 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2374
2375 C<$directory> is used to return statements from files in a previously created
2376 L</create_ddl_dir> directory and is optional. The filenames are constructed
2377 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2378
2379 If no C<$directory> is specified then the statements are constructed on the
2380 fly using L<SQL::Translator> and C<$version> is ignored.
2381
2382 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2383
2384 =cut
2385
2386 sub deployment_statements {
2387   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2388   $type ||= $self->sqlt_type;
2389   $version ||= $schema->schema_version || '1.x';
2390   $dir ||= './';
2391   my $filename = $schema->ddl_filename($type, $version, $dir);
2392   if(-f $filename)
2393   {
2394       my $file;
2395       open($file, "<$filename")
2396         or $self->throw_exception("Can't open $filename ($!)");
2397       my @rows = <$file>;
2398       close($file);
2399       return join('', @rows);
2400   }
2401
2402   $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
2403     if !$self->_sqlt_version_ok;
2404
2405   # sources needs to be a parser arg, but for simplicty allow at top level
2406   # coming in
2407   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2408       if exists $sqltargs->{sources};
2409
2410   my $tr = SQL::Translator->new(
2411     producer => "SQL::Translator::Producer::${type}",
2412     %$sqltargs,
2413     parser => 'SQL::Translator::Parser::DBIx::Class',
2414     data => $schema,
2415   );
2416
2417   my @ret;
2418   my $wa = wantarray;
2419   if ($wa) {
2420     @ret = $tr->translate;
2421   }
2422   else {
2423     $ret[0] = $tr->translate;
2424   }
2425
2426   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2427     unless (@ret && defined $ret[0]);
2428
2429   return $wa ? @ret : $ret[0];
2430 }
2431
2432 sub deploy {
2433   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2434   my $deploy = sub {
2435     my $line = shift;
2436     return if($line =~ /^--/);
2437     return if(!$line);
2438     # next if($line =~ /^DROP/m);
2439     return if($line =~ /^BEGIN TRANSACTION/m);
2440     return if($line =~ /^COMMIT/m);
2441     return if $line =~ /^\s+$/; # skip whitespace only
2442     $self->_query_start($line);
2443     eval {
2444       # do a dbh_do cycle here, as we need some error checking in
2445       # place (even though we will ignore errors)
2446       $self->dbh_do (sub { $_[1]->do($line) });
2447     };
2448     if ($@) {
2449       carp qq{$@ (running "${line}")};
2450     }
2451     $self->_query_end($line);
2452   };
2453   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2454   if (@statements > 1) {
2455     foreach my $statement (@statements) {
2456       $deploy->( $statement );
2457     }
2458   }
2459   elsif (@statements == 1) {
2460     foreach my $line ( split(";\n", $statements[0])) {
2461       $deploy->( $line );
2462     }
2463   }
2464 }
2465
2466 =head2 datetime_parser
2467
2468 Returns the datetime parser class
2469
2470 =cut
2471
2472 sub datetime_parser {
2473   my $self = shift;
2474   return $self->{datetime_parser} ||= do {
2475     $self->build_datetime_parser(@_);
2476   };
2477 }
2478
2479 =head2 datetime_parser_type
2480
2481 Defines (returns) the datetime parser class - currently hardwired to
2482 L<DateTime::Format::MySQL>
2483
2484 =cut
2485
2486 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2487
2488 =head2 build_datetime_parser
2489
2490 See L</datetime_parser>
2491
2492 =cut
2493
2494 sub build_datetime_parser {
2495   my $self = shift;
2496   my $type = $self->datetime_parser_type(@_);
2497   $self->ensure_class_loaded ($type);
2498   return $type;
2499 }
2500
2501
2502 =head2 is_replicating
2503
2504 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2505 replicate from a master database.  Default is undef, which is the result
2506 returned by databases that don't support replication.
2507
2508 =cut
2509
2510 sub is_replicating {
2511     return;
2512
2513 }
2514
2515 =head2 lag_behind_master
2516
2517 Returns a number that represents a certain amount of lag behind a master db
2518 when a given storage is replicating.  The number is database dependent, but
2519 starts at zero and increases with the amount of lag. Default in undef
2520
2521 =cut
2522
2523 sub lag_behind_master {
2524     return;
2525 }
2526
2527 # SQLT version handling
2528 {
2529   my $_sqlt_version_ok;     # private
2530   my $_sqlt_version_error;  # private
2531
2532   sub _sqlt_version_ok {
2533     if (!defined $_sqlt_version_ok) {
2534       eval "use SQL::Translator $minimum_sqlt_version";
2535       if ($@) {
2536         $_sqlt_version_ok = 0;
2537         $_sqlt_version_error = $@;
2538       }
2539       else {
2540         $_sqlt_version_ok = 1;
2541       }
2542     }
2543     return $_sqlt_version_ok;
2544   }
2545
2546   sub _sqlt_version_error {
2547     shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
2548     return $_sqlt_version_error;
2549   }
2550
2551   sub _sqlt_minimum_version { $minimum_sqlt_version };
2552 }
2553
2554 =head2 relname_to_table_alias
2555
2556 =over 4
2557
2558 =item Arguments: $relname, $join_count
2559
2560 =back
2561
2562 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2563 queries.
2564
2565 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2566 way these aliases are named.
2567
2568 The default behavior is C<"$relname_$join_count" if $join_count > 1>, otherwise
2569 C<"$relname">.
2570
2571 =cut
2572
2573 sub relname_to_table_alias {
2574   my ($self, $relname, $join_count) = @_;
2575
2576   my $alias = ($join_count && $join_count > 1 ?
2577     join('_', $relname, $join_count) : $relname);
2578
2579   return $alias;
2580 }
2581
2582 sub DESTROY {
2583   my $self = shift;
2584
2585   $self->_verify_pid if $self->_dbh;
2586
2587   # some databases need this to stop spewing warnings
2588   if (my $dbh = $self->_dbh) {
2589     local $@;
2590     eval {
2591       %{ $dbh->{CachedKids} } = ();
2592       $dbh->disconnect;
2593     };
2594   }
2595
2596   $self->_dbh(undef);
2597 }
2598
2599 1;
2600
2601 =head1 USAGE NOTES
2602
2603 =head2 DBIx::Class and AutoCommit
2604
2605 DBIx::Class can do some wonderful magic with handling exceptions,
2606 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2607 (the default) combined with C<txn_do> for transaction support.
2608
2609 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2610 in an assumed transaction between commits, and you're telling us you'd
2611 like to manage that manually.  A lot of the magic protections offered by
2612 this module will go away.  We can't protect you from exceptions due to database
2613 disconnects because we don't know anything about how to restart your
2614 transactions.  You're on your own for handling all sorts of exceptional
2615 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2616 be with raw DBI.
2617
2618
2619 =head1 AUTHORS
2620
2621 Matt S. Trout <mst@shadowcatsystems.co.uk>
2622
2623 Andy Grundman <andy@hybridized.org>
2624
2625 =head1 LICENSE
2626
2627 You may distribute this code under the same terms as Perl itself.
2628
2629 =cut