collect _server_info on connection
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18
19 __PACKAGE__->mk_group_accessors('simple' =>
20   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
21      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints
22      _server_info/
23 );
24
25 # the values for these accessors are picked out (and deleted) from
26 # the attribute hashref passed to connect_info
27 my @storage_options = qw/
28   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
29   disable_sth_caching unsafe auto_savepoint
30 /;
31 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
32
33
34 # default cursor class, overridable in connect_info attributes
35 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
36
37 __PACKAGE__->mk_group_accessors('inherited' => qw/
38   sql_maker_class
39   can_insert_returning
40 /);
41 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
42
43
44 # Each of these methods need _determine_driver called before itself
45 # in order to function reliably. This is a purely DRY optimization
46 my @rdbms_specific_methods = qw/
47   deployment_statements
48   sqlt_type
49   build_datetime_parser
50   datetime_parser_type
51
52   insert
53   insert_bulk
54   update
55   delete
56   select
57   select_single
58 /;
59
60 for my $meth (@rdbms_specific_methods) {
61
62   my $orig = __PACKAGE__->can ($meth)
63     or next;
64
65   no strict qw/refs/;
66   no warnings qw/redefine/;
67   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
68     if (not $_[0]->_driver_determined) {
69       $_[0]->_determine_driver;
70       goto $_[0]->can($meth);
71     }
72     $orig->(@_);
73   };
74 }
75
76
77 =head1 NAME
78
79 DBIx::Class::Storage::DBI - DBI storage handler
80
81 =head1 SYNOPSIS
82
83   my $schema = MySchema->connect('dbi:SQLite:my.db');
84
85   $schema->storage->debug(1);
86
87   my @stuff = $schema->storage->dbh_do(
88     sub {
89       my ($storage, $dbh, @args) = @_;
90       $dbh->do("DROP TABLE authors");
91     },
92     @column_list
93   );
94
95   $schema->resultset('Book')->search({
96      written_on => $schema->storage->datetime_parser(DateTime->now)
97   });
98
99 =head1 DESCRIPTION
100
101 This class represents the connection to an RDBMS via L<DBI>.  See
102 L<DBIx::Class::Storage> for general information.  This pod only
103 documents DBI-specific methods and behaviors.
104
105 =head1 METHODS
106
107 =cut
108
109 sub new {
110   my $new = shift->next::method(@_);
111
112   $new->transaction_depth(0);
113   $new->_sql_maker_opts({});
114   $new->{savepoints} = [];
115   $new->{_in_dbh_do} = 0;
116   $new->{_dbh_gen} = 0;
117
118   $new;
119 }
120
121 =head2 connect_info
122
123 This method is normally called by L<DBIx::Class::Schema/connection>, which
124 encapsulates its argument list in an arrayref before passing them here.
125
126 The argument list may contain:
127
128 =over
129
130 =item *
131
132 The same 4-element argument set one would normally pass to
133 L<DBI/connect>, optionally followed by
134 L<extra attributes|/DBIx::Class specific connection attributes>
135 recognized by DBIx::Class:
136
137   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
138
139 =item *
140
141 A single code reference which returns a connected
142 L<DBI database handle|DBI/connect> optionally followed by
143 L<extra attributes|/DBIx::Class specific connection attributes> recognized
144 by DBIx::Class:
145
146   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
147
148 =item *
149
150 A single hashref with all the attributes and the dsn/user/password
151 mixed together:
152
153   $connect_info_args = [{
154     dsn => $dsn,
155     user => $user,
156     password => $pass,
157     %dbi_attributes,
158     %extra_attributes,
159   }];
160
161   $connect_info_args = [{
162     dbh_maker => sub { DBI->connect (...) },
163     %dbi_attributes,
164     %extra_attributes,
165   }];
166
167 This is particularly useful for L<Catalyst> based applications, allowing the
168 following config (L<Config::General> style):
169
170   <Model::DB>
171     schema_class   App::DB
172     <connect_info>
173       dsn          dbi:mysql:database=test
174       user         testuser
175       password     TestPass
176       AutoCommit   1
177     </connect_info>
178   </Model::DB>
179
180 The C<dsn>/C<user>/C<password> combination can be substituted by the
181 C<dbh_maker> key whose value is a coderef that returns a connected
182 L<DBI database handle|DBI/connect>
183
184 =back
185
186 Please note that the L<DBI> docs recommend that you always explicitly
187 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
188 recommends that it be set to I<1>, and that you perform transactions
189 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
190 to I<1> if you do not do explicitly set it to zero.  This is the default
191 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
192
193 =head3 DBIx::Class specific connection attributes
194
195 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
196 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
197 the following connection options. These options can be mixed in with your other
198 L<DBI> connection attributes, or placed in a separate hashref
199 (C<\%extra_attributes>) as shown above.
200
201 Every time C<connect_info> is invoked, any previous settings for
202 these options will be cleared before setting the new ones, regardless of
203 whether any options are specified in the new C<connect_info>.
204
205
206 =over
207
208 =item on_connect_do
209
210 Specifies things to do immediately after connecting or re-connecting to
211 the database.  Its value may contain:
212
213 =over
214
215 =item a scalar
216
217 This contains one SQL statement to execute.
218
219 =item an array reference
220
221 This contains SQL statements to execute in order.  Each element contains
222 a string or a code reference that returns a string.
223
224 =item a code reference
225
226 This contains some code to execute.  Unlike code references within an
227 array reference, its return value is ignored.
228
229 =back
230
231 =item on_disconnect_do
232
233 Takes arguments in the same form as L</on_connect_do> and executes them
234 immediately before disconnecting from the database.
235
236 Note, this only runs if you explicitly call L</disconnect> on the
237 storage object.
238
239 =item on_connect_call
240
241 A more generalized form of L</on_connect_do> that calls the specified
242 C<connect_call_METHOD> methods in your storage driver.
243
244   on_connect_do => 'select 1'
245
246 is equivalent to:
247
248   on_connect_call => [ [ do_sql => 'select 1' ] ]
249
250 Its values may contain:
251
252 =over
253
254 =item a scalar
255
256 Will call the C<connect_call_METHOD> method.
257
258 =item a code reference
259
260 Will execute C<< $code->($storage) >>
261
262 =item an array reference
263
264 Each value can be a method name or code reference.
265
266 =item an array of arrays
267
268 For each array, the first item is taken to be the C<connect_call_> method name
269 or code reference, and the rest are parameters to it.
270
271 =back
272
273 Some predefined storage methods you may use:
274
275 =over
276
277 =item do_sql
278
279 Executes a SQL string or a code reference that returns a SQL string. This is
280 what L</on_connect_do> and L</on_disconnect_do> use.
281
282 It can take:
283
284 =over
285
286 =item a scalar
287
288 Will execute the scalar as SQL.
289
290 =item an arrayref
291
292 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
293 attributes hashref and bind values.
294
295 =item a code reference
296
297 Will execute C<< $code->($storage) >> and execute the return array refs as
298 above.
299
300 =back
301
302 =item datetime_setup
303
304 Execute any statements necessary to initialize the database session to return
305 and accept datetime/timestamp values used with
306 L<DBIx::Class::InflateColumn::DateTime>.
307
308 Only necessary for some databases, see your specific storage driver for
309 implementation details.
310
311 =back
312
313 =item on_disconnect_call
314
315 Takes arguments in the same form as L</on_connect_call> and executes them
316 immediately before disconnecting from the database.
317
318 Calls the C<disconnect_call_METHOD> methods as opposed to the
319 C<connect_call_METHOD> methods called by L</on_connect_call>.
320
321 Note, this only runs if you explicitly call L</disconnect> on the
322 storage object.
323
324 =item disable_sth_caching
325
326 If set to a true value, this option will disable the caching of
327 statement handles via L<DBI/prepare_cached>.
328
329 =item limit_dialect
330
331 Sets the limit dialect. This is useful for JDBC-bridge among others
332 where the remote SQL-dialect cannot be determined by the name of the
333 driver alone. See also L<SQL::Abstract::Limit>.
334
335 =item quote_char
336
337 Specifies what characters to use to quote table and column names. If
338 you use this you will want to specify L</name_sep> as well.
339
340 C<quote_char> expects either a single character, in which case is it
341 is placed on either side of the table/column name, or an arrayref of length
342 2 in which case the table/column name is placed between the elements.
343
344 For example under MySQL you should use C<< quote_char => '`' >>, and for
345 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
346
347 =item name_sep
348
349 This only needs to be used in conjunction with C<quote_char>, and is used to
350 specify the character that separates elements (schemas, tables, columns) from
351 each other. In most cases this is simply a C<.>.
352
353 The consequences of not supplying this value is that L<SQL::Abstract>
354 will assume DBIx::Class' uses of aliases to be complete column
355 names. The output will look like I<"me.name"> when it should actually
356 be I<"me"."name">.
357
358 =item unsafe
359
360 This Storage driver normally installs its own C<HandleError>, sets
361 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
362 all database handles, including those supplied by a coderef.  It does this
363 so that it can have consistent and useful error behavior.
364
365 If you set this option to a true value, Storage will not do its usual
366 modifications to the database handle's attributes, and instead relies on
367 the settings in your connect_info DBI options (or the values you set in
368 your connection coderef, in the case that you are connecting via coderef).
369
370 Note that your custom settings can cause Storage to malfunction,
371 especially if you set a C<HandleError> handler that suppresses exceptions
372 and/or disable C<RaiseError>.
373
374 =item auto_savepoint
375
376 If this option is true, L<DBIx::Class> will use savepoints when nesting
377 transactions, making it possible to recover from failure in the inner
378 transaction without having to abort all outer transactions.
379
380 =item cursor_class
381
382 Use this argument to supply a cursor class other than the default
383 L<DBIx::Class::Storage::DBI::Cursor>.
384
385 =back
386
387 Some real-life examples of arguments to L</connect_info> and
388 L<DBIx::Class::Schema/connect>
389
390   # Simple SQLite connection
391   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
392
393   # Connect via subref
394   ->connect_info([ sub { DBI->connect(...) } ]);
395
396   # Connect via subref in hashref
397   ->connect_info([{
398     dbh_maker => sub { DBI->connect(...) },
399     on_connect_do => 'alter session ...',
400   }]);
401
402   # A bit more complicated
403   ->connect_info(
404     [
405       'dbi:Pg:dbname=foo',
406       'postgres',
407       'my_pg_password',
408       { AutoCommit => 1 },
409       { quote_char => q{"}, name_sep => q{.} },
410     ]
411   );
412
413   # Equivalent to the previous example
414   ->connect_info(
415     [
416       'dbi:Pg:dbname=foo',
417       'postgres',
418       'my_pg_password',
419       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
420     ]
421   );
422
423   # Same, but with hashref as argument
424   # See parse_connect_info for explanation
425   ->connect_info(
426     [{
427       dsn         => 'dbi:Pg:dbname=foo',
428       user        => 'postgres',
429       password    => 'my_pg_password',
430       AutoCommit  => 1,
431       quote_char  => q{"},
432       name_sep    => q{.},
433     }]
434   );
435
436   # Subref + DBIx::Class-specific connection options
437   ->connect_info(
438     [
439       sub { DBI->connect(...) },
440       {
441           quote_char => q{`},
442           name_sep => q{@},
443           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
444           disable_sth_caching => 1,
445       },
446     ]
447   );
448
449
450
451 =cut
452
453 sub connect_info {
454   my ($self, $info) = @_;
455
456   return $self->_connect_info if !$info;
457
458   $self->_connect_info($info); # copy for _connect_info
459
460   $info = $self->_normalize_connect_info($info)
461     if ref $info eq 'ARRAY';
462
463   for my $storage_opt (keys %{ $info->{storage_options} }) {
464     my $value = $info->{storage_options}{$storage_opt};
465
466     $self->$storage_opt($value);
467   }
468
469   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
470   #  the new set of options
471   $self->_sql_maker(undef);
472   $self->_sql_maker_opts({});
473
474   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
475     my $value = $info->{sql_maker_options}{$sql_maker_opt};
476
477     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
478   }
479
480   my %attrs = (
481     %{ $self->_default_dbi_connect_attributes || {} },
482     %{ $info->{attributes} || {} },
483   );
484
485   my @args = @{ $info->{arguments} };
486
487   $self->_dbi_connect_info([@args,
488     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
489
490   return $self->_connect_info;
491 }
492
493 sub _normalize_connect_info {
494   my ($self, $info_arg) = @_;
495   my %info;
496
497   my @args = @$info_arg;  # take a shallow copy for further mutilation
498
499   # combine/pre-parse arguments depending on invocation style
500
501   my %attrs;
502   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
503     %attrs = %{ $args[1] || {} };
504     @args = $args[0];
505   }
506   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
507     %attrs = %{$args[0]};
508     @args = ();
509     if (my $code = delete $attrs{dbh_maker}) {
510       @args = $code;
511
512       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
513       if (@ignored) {
514         carp sprintf (
515             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
516           . "to the result of 'dbh_maker'",
517
518           join (', ', map { "'$_'" } (@ignored) ),
519         );
520       }
521     }
522     else {
523       @args = delete @attrs{qw/dsn user password/};
524     }
525   }
526   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
527     %attrs = (
528       % { $args[3] || {} },
529       % { $args[4] || {} },
530     );
531     @args = @args[0,1,2];
532   }
533
534   $info{arguments} = \@args;
535
536   my @storage_opts = grep exists $attrs{$_},
537     @storage_options, 'cursor_class';
538
539   @{ $info{storage_options} }{@storage_opts} =
540     delete @attrs{@storage_opts} if @storage_opts;
541
542   my @sql_maker_opts = grep exists $attrs{$_},
543     qw/limit_dialect quote_char name_sep/;
544
545   @{ $info{sql_maker_options} }{@sql_maker_opts} =
546     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
547
548   $info{attributes} = \%attrs if %attrs;
549
550   return \%info;
551 }
552
553 sub _default_dbi_connect_attributes {
554   return {
555     AutoCommit => 1,
556     RaiseError => 1,
557     PrintError => 0,
558   };
559 }
560
561 =head2 on_connect_do
562
563 This method is deprecated in favour of setting via L</connect_info>.
564
565 =cut
566
567 =head2 on_disconnect_do
568
569 This method is deprecated in favour of setting via L</connect_info>.
570
571 =cut
572
573 sub _parse_connect_do {
574   my ($self, $type) = @_;
575
576   my $val = $self->$type;
577   return () if not defined $val;
578
579   my @res;
580
581   if (not ref($val)) {
582     push @res, [ 'do_sql', $val ];
583   } elsif (ref($val) eq 'CODE') {
584     push @res, $val;
585   } elsif (ref($val) eq 'ARRAY') {
586     push @res, map { [ 'do_sql', $_ ] } @$val;
587   } else {
588     $self->throw_exception("Invalid type for $type: ".ref($val));
589   }
590
591   return \@res;
592 }
593
594 =head2 dbh_do
595
596 Arguments: ($subref | $method_name), @extra_coderef_args?
597
598 Execute the given $subref or $method_name using the new exception-based
599 connection management.
600
601 The first two arguments will be the storage object that C<dbh_do> was called
602 on and a database handle to use.  Any additional arguments will be passed
603 verbatim to the called subref as arguments 2 and onwards.
604
605 Using this (instead of $self->_dbh or $self->dbh) ensures correct
606 exception handling and reconnection (or failover in future subclasses).
607
608 Your subref should have no side-effects outside of the database, as
609 there is the potential for your subref to be partially double-executed
610 if the database connection was stale/dysfunctional.
611
612 Example:
613
614   my @stuff = $schema->storage->dbh_do(
615     sub {
616       my ($storage, $dbh, @cols) = @_;
617       my $cols = join(q{, }, @cols);
618       $dbh->selectrow_array("SELECT $cols FROM foo");
619     },
620     @column_list
621   );
622
623 =cut
624
625 sub dbh_do {
626   my $self = shift;
627   my $code = shift;
628
629   my $dbh = $self->_get_dbh;
630
631   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
632       || $self->{transaction_depth};
633
634   local $self->{_in_dbh_do} = 1;
635
636   my @result;
637   my $want_array = wantarray;
638
639   eval {
640
641     if($want_array) {
642         @result = $self->$code($dbh, @_);
643     }
644     elsif(defined $want_array) {
645         $result[0] = $self->$code($dbh, @_);
646     }
647     else {
648         $self->$code($dbh, @_);
649     }
650   };
651
652   # ->connected might unset $@ - copy
653   my $exception = $@;
654   if(!$exception) { return $want_array ? @result : $result[0] }
655
656   $self->throw_exception($exception) if $self->connected;
657
658   # We were not connected - reconnect and retry, but let any
659   #  exception fall right through this time
660   carp "Retrying $code after catching disconnected exception: $exception"
661     if $ENV{DBIC_DBIRETRY_DEBUG};
662   $self->_populate_dbh;
663   $self->$code($self->_dbh, @_);
664 }
665
666 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
667 # It also informs dbh_do to bypass itself while under the direction of txn_do,
668 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
669 sub txn_do {
670   my $self = shift;
671   my $coderef = shift;
672
673   ref $coderef eq 'CODE' or $self->throw_exception
674     ('$coderef must be a CODE reference');
675
676   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
677
678   local $self->{_in_dbh_do} = 1;
679
680   my @result;
681   my $want_array = wantarray;
682
683   my $tried = 0;
684   while(1) {
685     eval {
686       $self->_get_dbh;
687
688       $self->txn_begin;
689       if($want_array) {
690           @result = $coderef->(@_);
691       }
692       elsif(defined $want_array) {
693           $result[0] = $coderef->(@_);
694       }
695       else {
696           $coderef->(@_);
697       }
698       $self->txn_commit;
699     };
700
701     # ->connected might unset $@ - copy
702     my $exception = $@;
703     if(!$exception) { return $want_array ? @result : $result[0] }
704
705     if($tried++ || $self->connected) {
706       eval { $self->txn_rollback };
707       my $rollback_exception = $@;
708       if($rollback_exception) {
709         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
710         $self->throw_exception($exception)  # propagate nested rollback
711           if $rollback_exception =~ /$exception_class/;
712
713         $self->throw_exception(
714           "Transaction aborted: ${exception}. "
715           . "Rollback failed: ${rollback_exception}"
716         );
717       }
718       $self->throw_exception($exception)
719     }
720
721     # We were not connected, and was first try - reconnect and retry
722     # via the while loop
723     carp "Retrying $coderef after catching disconnected exception: $exception"
724       if $ENV{DBIC_DBIRETRY_DEBUG};
725     $self->_populate_dbh;
726   }
727 }
728
729 =head2 disconnect
730
731 Our C<disconnect> method also performs a rollback first if the
732 database is not in C<AutoCommit> mode.
733
734 =cut
735
736 sub disconnect {
737   my ($self) = @_;
738
739   if( $self->_dbh ) {
740     my @actions;
741
742     push @actions, ( $self->on_disconnect_call || () );
743     push @actions, $self->_parse_connect_do ('on_disconnect_do');
744
745     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
746
747     $self->_dbh_rollback unless $self->_dbh_autocommit;
748
749     %{ $self->_dbh->{CachedKids} } = ();
750     $self->_dbh->disconnect;
751     $self->_dbh(undef);
752     $self->{_dbh_gen}++;
753   }
754 }
755
756 =head2 with_deferred_fk_checks
757
758 =over 4
759
760 =item Arguments: C<$coderef>
761
762 =item Return Value: The return value of $coderef
763
764 =back
765
766 Storage specific method to run the code ref with FK checks deferred or
767 in MySQL's case disabled entirely.
768
769 =cut
770
771 # Storage subclasses should override this
772 sub with_deferred_fk_checks {
773   my ($self, $sub) = @_;
774   $sub->();
775 }
776
777 =head2 connected
778
779 =over
780
781 =item Arguments: none
782
783 =item Return Value: 1|0
784
785 =back
786
787 Verifies that the current database handle is active and ready to execute
788 an SQL statement (e.g. the connection did not get stale, server is still
789 answering, etc.) This method is used internally by L</dbh>.
790
791 =cut
792
793 sub connected {
794   my $self = shift;
795   return 0 unless $self->_seems_connected;
796
797   #be on the safe side
798   local $self->_dbh->{RaiseError} = 1;
799
800   return $self->_ping;
801 }
802
803 sub _seems_connected {
804   my $self = shift;
805
806   my $dbh = $self->_dbh
807     or return 0;
808
809   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
810     $self->_dbh(undef);
811     $self->{_dbh_gen}++;
812     return 0;
813   }
814   else {
815     $self->_verify_pid;
816     return 0 if !$self->_dbh;
817   }
818
819   return $dbh->FETCH('Active');
820 }
821
822 sub _ping {
823   my $self = shift;
824
825   my $dbh = $self->_dbh or return 0;
826
827   return $dbh->ping;
828 }
829
830 # handle pid changes correctly
831 #  NOTE: assumes $self->_dbh is a valid $dbh
832 sub _verify_pid {
833   my ($self) = @_;
834
835   return if defined $self->_conn_pid && $self->_conn_pid == $$;
836
837   $self->_dbh->{InactiveDestroy} = 1;
838   $self->_dbh(undef);
839   $self->{_dbh_gen}++;
840
841   return;
842 }
843
844 sub ensure_connected {
845   my ($self) = @_;
846
847   unless ($self->connected) {
848     $self->_populate_dbh;
849   }
850 }
851
852 =head2 dbh
853
854 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
855 is guaranteed to be healthy by implicitly calling L</connected>, and if
856 necessary performing a reconnection before returning. Keep in mind that this
857 is very B<expensive> on some database engines. Consider using L</dbh_do>
858 instead.
859
860 =cut
861
862 sub dbh {
863   my ($self) = @_;
864
865   if (not $self->_dbh) {
866     $self->_populate_dbh;
867   } else {
868     $self->ensure_connected;
869   }
870   return $self->_dbh;
871 }
872
873 # this is the internal "get dbh or connect (don't check)" method
874 sub _get_dbh {
875   my $self = shift;
876   $self->_verify_pid if $self->_dbh;
877   $self->_populate_dbh unless $self->_dbh;
878   return $self->_dbh;
879 }
880
881 sub _sql_maker_args {
882     my ($self) = @_;
883
884     return (
885       bindtype=>'columns',
886       array_datatypes => 1,
887       limit_dialect => $self->_get_dbh,
888       %{$self->_sql_maker_opts}
889     );
890 }
891
892 sub sql_maker {
893   my ($self) = @_;
894   unless ($self->_sql_maker) {
895     my $sql_maker_class = $self->sql_maker_class;
896     $self->ensure_class_loaded ($sql_maker_class);
897     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
898   }
899   return $self->_sql_maker;
900 }
901
902 # nothing to do by default
903 sub _rebless {}
904 sub _init {}
905
906 sub _populate_dbh {
907   my ($self) = @_;
908
909   my @info = @{$self->_dbi_connect_info || []};
910   $self->_dbh(undef); # in case ->connected failed we might get sent here
911   $self->_dbh($self->_connect(@info));
912
913   $self->_conn_pid($$);
914   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
915
916   $self->_determine_driver;
917
918   # Always set the transaction depth on connect, since
919   #  there is no transaction in progress by definition
920   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
921
922   $self->_run_connection_actions unless $self->{_in_determine_driver};
923
924   $self->_get_server_info;
925 }
926
927 sub _run_connection_actions {
928   my $self = shift;
929   my @actions;
930
931   push @actions, ( $self->on_connect_call || () );
932   push @actions, $self->_parse_connect_do ('on_connect_do');
933
934   $self->_do_connection_actions(connect_call_ => $_) for @actions;
935 }
936
937 sub _get_server_info {
938   my $self = shift;
939   my %info;
940
941   $info{dbms_ver} = $self->_get_dbh->get_info(18);
942
943   $self->_server_info(\%info);
944
945   return \%info;
946 }
947
948 sub _determine_driver {
949   my ($self) = @_;
950
951   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
952     my $started_connected = 0;
953     local $self->{_in_determine_driver} = 1;
954
955     if (ref($self) eq __PACKAGE__) {
956       my $driver;
957       if ($self->_dbh) { # we are connected
958         $driver = $self->_dbh->{Driver}{Name};
959         $started_connected = 1;
960       } else {
961         # if connect_info is a CODEREF, we have no choice but to connect
962         if (ref $self->_dbi_connect_info->[0] &&
963             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
964           $self->_populate_dbh;
965           $driver = $self->_dbh->{Driver}{Name};
966         }
967         else {
968           # try to use dsn to not require being connected, the driver may still
969           # force a connection in _rebless to determine version
970           # (dsn may not be supplied at all if all we do is make a mock-schema)
971           my $dsn = $self->_dbi_connect_info->[0] || '';
972           ($driver) = $dsn =~ /dbi:([^:]+):/i;
973         }
974       }
975
976       if ($driver) {
977         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
978         if ($self->load_optional_class($storage_class)) {
979           mro::set_mro($storage_class, 'c3');
980           bless $self, $storage_class;
981           $self->_rebless();
982         }
983       }
984     }
985
986     $self->_driver_determined(1);
987
988     $self->_init; # run driver-specific initializations
989
990     $self->_run_connection_actions
991         if !$started_connected && defined $self->_dbh;
992   }
993 }
994
995 sub _do_connection_actions {
996   my $self          = shift;
997   my $method_prefix = shift;
998   my $call          = shift;
999
1000   if (not ref($call)) {
1001     my $method = $method_prefix . $call;
1002     $self->$method(@_);
1003   } elsif (ref($call) eq 'CODE') {
1004     $self->$call(@_);
1005   } elsif (ref($call) eq 'ARRAY') {
1006     if (ref($call->[0]) ne 'ARRAY') {
1007       $self->_do_connection_actions($method_prefix, $_) for @$call;
1008     } else {
1009       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1010     }
1011   } else {
1012     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1013   }
1014
1015   return $self;
1016 }
1017
1018 sub connect_call_do_sql {
1019   my $self = shift;
1020   $self->_do_query(@_);
1021 }
1022
1023 sub disconnect_call_do_sql {
1024   my $self = shift;
1025   $self->_do_query(@_);
1026 }
1027
1028 # override in db-specific backend when necessary
1029 sub connect_call_datetime_setup { 1 }
1030
1031 sub _do_query {
1032   my ($self, $action) = @_;
1033
1034   if (ref $action eq 'CODE') {
1035     $action = $action->($self);
1036     $self->_do_query($_) foreach @$action;
1037   }
1038   else {
1039     # Most debuggers expect ($sql, @bind), so we need to exclude
1040     # the attribute hash which is the second argument to $dbh->do
1041     # furthermore the bind values are usually to be presented
1042     # as named arrayref pairs, so wrap those here too
1043     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1044     my $sql = shift @do_args;
1045     my $attrs = shift @do_args;
1046     my @bind = map { [ undef, $_ ] } @do_args;
1047
1048     $self->_query_start($sql, @bind);
1049     $self->_get_dbh->do($sql, $attrs, @do_args);
1050     $self->_query_end($sql, @bind);
1051   }
1052
1053   return $self;
1054 }
1055
1056 sub _connect {
1057   my ($self, @info) = @_;
1058
1059   $self->throw_exception("You failed to provide any connection info")
1060     if !@info;
1061
1062   my ($old_connect_via, $dbh);
1063
1064   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1065     $old_connect_via = $DBI::connect_via;
1066     $DBI::connect_via = 'connect';
1067   }
1068
1069   eval {
1070     if(ref $info[0] eq 'CODE') {
1071        $dbh = $info[0]->();
1072     }
1073     else {
1074        $dbh = DBI->connect(@info);
1075     }
1076
1077     if($dbh && !$self->unsafe) {
1078       my $weak_self = $self;
1079       Scalar::Util::weaken($weak_self);
1080       $dbh->{HandleError} = sub {
1081           if ($weak_self) {
1082             $weak_self->throw_exception("DBI Exception: $_[0]");
1083           }
1084           else {
1085             # the handler may be invoked by something totally out of
1086             # the scope of DBIC
1087             croak ("DBI Exception: $_[0]");
1088           }
1089       };
1090       $dbh->{ShowErrorStatement} = 1;
1091       $dbh->{RaiseError} = 1;
1092       $dbh->{PrintError} = 0;
1093     }
1094   };
1095
1096   $DBI::connect_via = $old_connect_via if $old_connect_via;
1097
1098   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1099     if !$dbh || $@;
1100
1101   $self->_dbh_autocommit($dbh->{AutoCommit});
1102
1103   $dbh;
1104 }
1105
1106 sub svp_begin {
1107   my ($self, $name) = @_;
1108
1109   $name = $self->_svp_generate_name
1110     unless defined $name;
1111
1112   $self->throw_exception ("You can't use savepoints outside a transaction")
1113     if $self->{transaction_depth} == 0;
1114
1115   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1116     unless $self->can('_svp_begin');
1117
1118   push @{ $self->{savepoints} }, $name;
1119
1120   $self->debugobj->svp_begin($name) if $self->debug;
1121
1122   return $self->_svp_begin($name);
1123 }
1124
1125 sub svp_release {
1126   my ($self, $name) = @_;
1127
1128   $self->throw_exception ("You can't use savepoints outside a transaction")
1129     if $self->{transaction_depth} == 0;
1130
1131   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1132     unless $self->can('_svp_release');
1133
1134   if (defined $name) {
1135     $self->throw_exception ("Savepoint '$name' does not exist")
1136       unless grep { $_ eq $name } @{ $self->{savepoints} };
1137
1138     # Dig through the stack until we find the one we are releasing.  This keeps
1139     # the stack up to date.
1140     my $svp;
1141
1142     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1143   } else {
1144     $name = pop @{ $self->{savepoints} };
1145   }
1146
1147   $self->debugobj->svp_release($name) if $self->debug;
1148
1149   return $self->_svp_release($name);
1150 }
1151
1152 sub svp_rollback {
1153   my ($self, $name) = @_;
1154
1155   $self->throw_exception ("You can't use savepoints outside a transaction")
1156     if $self->{transaction_depth} == 0;
1157
1158   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1159     unless $self->can('_svp_rollback');
1160
1161   if (defined $name) {
1162       # If they passed us a name, verify that it exists in the stack
1163       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1164           $self->throw_exception("Savepoint '$name' does not exist!");
1165       }
1166
1167       # Dig through the stack until we find the one we are releasing.  This keeps
1168       # the stack up to date.
1169       while(my $s = pop(@{ $self->{savepoints} })) {
1170           last if($s eq $name);
1171       }
1172       # Add the savepoint back to the stack, as a rollback doesn't remove the
1173       # named savepoint, only everything after it.
1174       push(@{ $self->{savepoints} }, $name);
1175   } else {
1176       # We'll assume they want to rollback to the last savepoint
1177       $name = $self->{savepoints}->[-1];
1178   }
1179
1180   $self->debugobj->svp_rollback($name) if $self->debug;
1181
1182   return $self->_svp_rollback($name);
1183 }
1184
1185 sub _svp_generate_name {
1186     my ($self) = @_;
1187
1188     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1189 }
1190
1191 sub txn_begin {
1192   my $self = shift;
1193
1194   # this means we have not yet connected and do not know the AC status
1195   # (e.g. coderef $dbh)
1196   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1197
1198   if($self->{transaction_depth} == 0) {
1199     $self->debugobj->txn_begin()
1200       if $self->debug;
1201     $self->_dbh_begin_work;
1202   }
1203   elsif ($self->auto_savepoint) {
1204     $self->svp_begin;
1205   }
1206   $self->{transaction_depth}++;
1207 }
1208
1209 sub _dbh_begin_work {
1210   my $self = shift;
1211
1212   # if the user is utilizing txn_do - good for him, otherwise we need to
1213   # ensure that the $dbh is healthy on BEGIN.
1214   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1215   # will be replaced by a failure of begin_work itself (which will be
1216   # then retried on reconnect)
1217   if ($self->{_in_dbh_do}) {
1218     $self->_dbh->begin_work;
1219   } else {
1220     $self->dbh_do(sub { $_[1]->begin_work });
1221   }
1222 }
1223
1224 sub txn_commit {
1225   my $self = shift;
1226   if ($self->{transaction_depth} == 1) {
1227     $self->debugobj->txn_commit()
1228       if ($self->debug);
1229     $self->_dbh_commit;
1230     $self->{transaction_depth} = 0
1231       if $self->_dbh_autocommit;
1232   }
1233   elsif($self->{transaction_depth} > 1) {
1234     $self->{transaction_depth}--;
1235     $self->svp_release
1236       if $self->auto_savepoint;
1237   }
1238 }
1239
1240 sub _dbh_commit {
1241   my $self = shift;
1242   my $dbh  = $self->_dbh
1243     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1244   $dbh->commit;
1245 }
1246
1247 sub txn_rollback {
1248   my $self = shift;
1249   my $dbh = $self->_dbh;
1250   eval {
1251     if ($self->{transaction_depth} == 1) {
1252       $self->debugobj->txn_rollback()
1253         if ($self->debug);
1254       $self->{transaction_depth} = 0
1255         if $self->_dbh_autocommit;
1256       $self->_dbh_rollback;
1257     }
1258     elsif($self->{transaction_depth} > 1) {
1259       $self->{transaction_depth}--;
1260       if ($self->auto_savepoint) {
1261         $self->svp_rollback;
1262         $self->svp_release;
1263       }
1264     }
1265     else {
1266       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1267     }
1268   };
1269   if ($@) {
1270     my $error = $@;
1271     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1272     $error =~ /$exception_class/ and $self->throw_exception($error);
1273     # ensure that a failed rollback resets the transaction depth
1274     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1275     $self->throw_exception($error);
1276   }
1277 }
1278
1279 sub _dbh_rollback {
1280   my $self = shift;
1281   my $dbh  = $self->_dbh
1282     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1283   $dbh->rollback;
1284 }
1285
1286 # This used to be the top-half of _execute.  It was split out to make it
1287 #  easier to override in NoBindVars without duping the rest.  It takes up
1288 #  all of _execute's args, and emits $sql, @bind.
1289 sub _prep_for_execute {
1290   my ($self, $op, $extra_bind, $ident, $args) = @_;
1291
1292   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1293     $ident = $ident->from();
1294   }
1295
1296   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1297
1298   unshift(@bind,
1299     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1300       if $extra_bind;
1301   return ($sql, \@bind);
1302 }
1303
1304
1305 sub _fix_bind_params {
1306     my ($self, @bind) = @_;
1307
1308     ### Turn @bind from something like this:
1309     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1310     ### to this:
1311     ###   ( "'1'", "'1'", "'3'" )
1312     return
1313         map {
1314             if ( defined( $_ && $_->[1] ) ) {
1315                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1316             }
1317             else { q{'NULL'}; }
1318         } @bind;
1319 }
1320
1321 sub _query_start {
1322     my ( $self, $sql, @bind ) = @_;
1323
1324     if ( $self->debug ) {
1325         @bind = $self->_fix_bind_params(@bind);
1326
1327         $self->debugobj->query_start( $sql, @bind );
1328     }
1329 }
1330
1331 sub _query_end {
1332     my ( $self, $sql, @bind ) = @_;
1333
1334     if ( $self->debug ) {
1335         @bind = $self->_fix_bind_params(@bind);
1336         $self->debugobj->query_end( $sql, @bind );
1337     }
1338 }
1339
1340 sub _dbh_execute {
1341   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1342
1343   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1344
1345   $self->_query_start( $sql, @$bind );
1346
1347   my $sth = $self->sth($sql,$op);
1348
1349   my $placeholder_index = 1;
1350
1351   foreach my $bound (@$bind) {
1352     my $attributes = {};
1353     my($column_name, @data) = @$bound;
1354
1355     if ($bind_attributes) {
1356       $attributes = $bind_attributes->{$column_name}
1357       if defined $bind_attributes->{$column_name};
1358     }
1359
1360     foreach my $data (@data) {
1361       my $ref = ref $data;
1362       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1363
1364       $sth->bind_param($placeholder_index, $data, $attributes);
1365       $placeholder_index++;
1366     }
1367   }
1368
1369   # Can this fail without throwing an exception anyways???
1370   my $rv = $sth->execute();
1371   $self->throw_exception($sth->errstr) if !$rv;
1372
1373   $self->_query_end( $sql, @$bind );
1374
1375   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1376 }
1377
1378 sub _execute {
1379     my $self = shift;
1380     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1381 }
1382
1383 sub _prefetch_insert_auto_nextvals {
1384   my ($self, $source, $to_insert) = @_;
1385
1386   my $upd = {};
1387
1388   foreach my $col ( $source->columns ) {
1389     if ( !defined $to_insert->{$col} ) {
1390       my $col_info = $source->column_info($col);
1391
1392       if ( $col_info->{auto_nextval} ) {
1393         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1394           'nextval',
1395           $col_info->{sequence} ||=
1396             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1397         );
1398       }
1399     }
1400   }
1401
1402   return $upd;
1403 }
1404
1405 sub insert {
1406   my $self = shift;
1407   my ($source, $to_insert, $opts) = @_;
1408
1409   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1410
1411   my $bind_attributes = $self->source_bind_attributes($source);
1412
1413   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1414
1415   if ($opts->{returning}) {
1416     my @ret_cols = @{$opts->{returning}};
1417
1418     my @ret_vals = eval {
1419       local $SIG{__WARN__} = sub {};
1420       my @r = $sth->fetchrow_array;
1421       $sth->finish;
1422       @r;
1423     };
1424
1425     my %ret;
1426     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1427
1428     $updated_cols = {
1429       %$updated_cols,
1430       %ret,
1431     };
1432   }
1433
1434   return $updated_cols;
1435 }
1436
1437 ## Currently it is assumed that all values passed will be "normal", i.e. not
1438 ## scalar refs, or at least, all the same type as the first set, the statement is
1439 ## only prepped once.
1440 sub insert_bulk {
1441   my ($self, $source, $cols, $data) = @_;
1442
1443   my %colvalues;
1444   @colvalues{@$cols} = (0..$#$cols);
1445
1446   for my $i (0..$#$cols) {
1447     my $first_val = $data->[0][$i];
1448     next unless ref $first_val eq 'SCALAR';
1449
1450     $colvalues{ $cols->[$i] } = $first_val;
1451   }
1452
1453   # check for bad data and stringify stringifiable objects
1454   my $bad_slice = sub {
1455     my ($msg, $col_idx, $slice_idx) = @_;
1456     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1457       $msg,
1458       $cols->[$col_idx],
1459       do {
1460         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1461         Data::Dumper::Concise::Dumper({
1462           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1463         }),
1464       }
1465     );
1466   };
1467
1468   for my $datum_idx (0..$#$data) {
1469     my $datum = $data->[$datum_idx];
1470
1471     for my $col_idx (0..$#$cols) {
1472       my $val            = $datum->[$col_idx];
1473       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1474       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1475
1476       if ($is_literal_sql) {
1477         if (not ref $val) {
1478           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1479         }
1480         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1481           $bad_slice->("$reftype reference found where literal SQL expected",
1482             $col_idx, $datum_idx);
1483         }
1484         elsif ($$val ne $$sqla_bind){
1485           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1486             $col_idx, $datum_idx);
1487         }
1488       }
1489       elsif (my $reftype = ref $val) {
1490         require overload;
1491         if (overload::Method($val, '""')) {
1492           $datum->[$col_idx] = "".$val;
1493         }
1494         else {
1495           $bad_slice->("$reftype reference found where bind expected",
1496             $col_idx, $datum_idx);
1497         }
1498       }
1499     }
1500   }
1501
1502   my ($sql, $bind) = $self->_prep_for_execute (
1503     'insert', undef, $source, [\%colvalues]
1504   );
1505   my @bind = @$bind;
1506
1507   my $empty_bind = 1 if (not @bind) &&
1508     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1509
1510   if ((not @bind) && (not $empty_bind)) {
1511     $self->throw_exception(
1512       'Cannot insert_bulk without support for placeholders'
1513     );
1514   }
1515
1516   # neither _execute_array, nor _execute_inserts_with_no_binds are
1517   # atomic (even if _execute _array is a single call). Thus a safety
1518   # scope guard
1519   my $guard = $self->txn_scope_guard;
1520
1521   $self->_query_start( $sql, ['__BULK__'] );
1522   my $sth = $self->sth($sql);
1523   my $rv = do {
1524     if ($empty_bind) {
1525       # bind_param_array doesn't work if there are no binds
1526       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1527     }
1528     else {
1529 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1530       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1531     }
1532   };
1533
1534   $self->_query_end( $sql, ['__BULK__'] );
1535
1536   $guard->commit;
1537
1538   return (wantarray ? ($rv, $sth, @bind) : $rv);
1539 }
1540
1541 sub _execute_array {
1542   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1543
1544   ## This must be an arrayref, else nothing works!
1545   my $tuple_status = [];
1546
1547   ## Get the bind_attributes, if any exist
1548   my $bind_attributes = $self->source_bind_attributes($source);
1549
1550   ## Bind the values and execute
1551   my $placeholder_index = 1;
1552
1553   foreach my $bound (@$bind) {
1554
1555     my $attributes = {};
1556     my ($column_name, $data_index) = @$bound;
1557
1558     if( $bind_attributes ) {
1559       $attributes = $bind_attributes->{$column_name}
1560       if defined $bind_attributes->{$column_name};
1561     }
1562
1563     my @data = map { $_->[$data_index] } @$data;
1564
1565     $sth->bind_param_array(
1566       $placeholder_index,
1567       [@data],
1568       (%$attributes ?  $attributes : ()),
1569     );
1570     $placeholder_index++;
1571   }
1572
1573   my $rv = eval {
1574     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1575   };
1576   my $err = $@ || $sth->errstr;
1577
1578 # Statement must finish even if there was an exception.
1579   eval { $sth->finish };
1580   $err = $@ unless $err;
1581
1582   if ($err) {
1583     my $i = 0;
1584     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1585
1586     $self->throw_exception("Unexpected populate error: $err")
1587       if ($i > $#$tuple_status);
1588
1589     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1590       ($tuple_status->[$i][1] || $err),
1591       Data::Dumper::Concise::Dumper({
1592         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1593       }),
1594     );
1595   }
1596   return $rv;
1597 }
1598
1599 sub _dbh_execute_array {
1600     my ($self, $sth, $tuple_status, @extra) = @_;
1601
1602     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1603 }
1604
1605 sub _dbh_execute_inserts_with_no_binds {
1606   my ($self, $sth, $count) = @_;
1607
1608   eval {
1609     my $dbh = $self->_get_dbh;
1610     local $dbh->{RaiseError} = 1;
1611     local $dbh->{PrintError} = 0;
1612
1613     $sth->execute foreach 1..$count;
1614   };
1615   my $exception = $@;
1616
1617 # Make sure statement is finished even if there was an exception.
1618   eval { $sth->finish };
1619   $exception = $@ unless $exception;
1620
1621   $self->throw_exception($exception) if $exception;
1622
1623   return $count;
1624 }
1625
1626 sub update {
1627   my ($self, $source, @args) = @_;
1628
1629   my $bind_attrs = $self->source_bind_attributes($source);
1630
1631   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1632 }
1633
1634
1635 sub delete {
1636   my ($self, $source, @args) = @_;
1637
1638   my $bind_attrs = $self->source_bind_attributes($source);
1639
1640   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1641 }
1642
1643 # We were sent here because the $rs contains a complex search
1644 # which will require a subquery to select the correct rows
1645 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1646 #
1647 # Generating a single PK column subquery is trivial and supported
1648 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1649 # Look at _multipk_update_delete()
1650 sub _subq_update_delete {
1651   my $self = shift;
1652   my ($rs, $op, $values) = @_;
1653
1654   my $rsrc = $rs->result_source;
1655
1656   # quick check if we got a sane rs on our hands
1657   my @pcols = $rsrc->_pri_cols;
1658
1659   my $sel = $rs->_resolved_attrs->{select};
1660   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1661
1662   if (
1663       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1664         ne
1665       join ("\x00", sort @$sel )
1666   ) {
1667     $self->throw_exception (
1668       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1669     );
1670   }
1671
1672   if (@pcols == 1) {
1673     return $self->$op (
1674       $rsrc,
1675       $op eq 'update' ? $values : (),
1676       { $pcols[0] => { -in => $rs->as_query } },
1677     );
1678   }
1679
1680   else {
1681     return $self->_multipk_update_delete (@_);
1682   }
1683 }
1684
1685 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1686 # resultset update/delete involving subqueries. So by default resort
1687 # to simple (and inefficient) delete_all style per-row opearations,
1688 # while allowing specific storages to override this with a faster
1689 # implementation.
1690 #
1691 sub _multipk_update_delete {
1692   return shift->_per_row_update_delete (@_);
1693 }
1694
1695 # This is the default loop used to delete/update rows for multi PK
1696 # resultsets, and used by mysql exclusively (because it can't do anything
1697 # else).
1698 #
1699 # We do not use $row->$op style queries, because resultset update/delete
1700 # is not expected to cascade (this is what delete_all/update_all is for).
1701 #
1702 # There should be no race conditions as the entire operation is rolled
1703 # in a transaction.
1704 #
1705 sub _per_row_update_delete {
1706   my $self = shift;
1707   my ($rs, $op, $values) = @_;
1708
1709   my $rsrc = $rs->result_source;
1710   my @pcols = $rsrc->_pri_cols;
1711
1712   my $guard = $self->txn_scope_guard;
1713
1714   # emulate the return value of $sth->execute for non-selects
1715   my $row_cnt = '0E0';
1716
1717   my $subrs_cur = $rs->cursor;
1718   my @all_pk = $subrs_cur->all;
1719   for my $pks ( @all_pk) {
1720
1721     my $cond;
1722     for my $i (0.. $#pcols) {
1723       $cond->{$pcols[$i]} = $pks->[$i];
1724     }
1725
1726     $self->$op (
1727       $rsrc,
1728       $op eq 'update' ? $values : (),
1729       $cond,
1730     );
1731
1732     $row_cnt++;
1733   }
1734
1735   $guard->commit;
1736
1737   return $row_cnt;
1738 }
1739
1740 sub _select {
1741   my $self = shift;
1742
1743   # localization is neccessary as
1744   # 1) there is no infrastructure to pass this around before SQLA2
1745   # 2) _select_args sets it and _prep_for_execute consumes it
1746   my $sql_maker = $self->sql_maker;
1747   local $sql_maker->{_dbic_rs_attrs};
1748
1749   return $self->_execute($self->_select_args(@_));
1750 }
1751
1752 sub _select_args_to_query {
1753   my $self = shift;
1754
1755   # localization is neccessary as
1756   # 1) there is no infrastructure to pass this around before SQLA2
1757   # 2) _select_args sets it and _prep_for_execute consumes it
1758   my $sql_maker = $self->sql_maker;
1759   local $sql_maker->{_dbic_rs_attrs};
1760
1761   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1762   #  = $self->_select_args($ident, $select, $cond, $attrs);
1763   my ($op, $bind, $ident, $bind_attrs, @args) =
1764     $self->_select_args(@_);
1765
1766   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1767   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1768   $prepared_bind ||= [];
1769
1770   return wantarray
1771     ? ($sql, $prepared_bind, $bind_attrs)
1772     : \[ "($sql)", @$prepared_bind ]
1773   ;
1774 }
1775
1776 sub _select_args {
1777   my ($self, $ident, $select, $where, $attrs) = @_;
1778
1779   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1780
1781   my $sql_maker = $self->sql_maker;
1782   $sql_maker->{_dbic_rs_attrs} = {
1783     %$attrs,
1784     select => $select,
1785     from => $ident,
1786     where => $where,
1787     $rs_alias && $alias2source->{$rs_alias}
1788       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1789       : ()
1790     ,
1791   };
1792
1793   # calculate bind_attrs before possible $ident mangling
1794   my $bind_attrs = {};
1795   for my $alias (keys %$alias2source) {
1796     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1797     for my $col (keys %$bindtypes) {
1798
1799       my $fqcn = join ('.', $alias, $col);
1800       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1801
1802       # Unqialified column names are nice, but at the same time can be
1803       # rather ambiguous. What we do here is basically go along with
1804       # the loop, adding an unqualified column slot to $bind_attrs,
1805       # alongside the fully qualified name. As soon as we encounter
1806       # another column by that name (which would imply another table)
1807       # we unset the unqualified slot and never add any info to it
1808       # to avoid erroneous type binding. If this happens the users
1809       # only choice will be to fully qualify his column name
1810
1811       if (exists $bind_attrs->{$col}) {
1812         $bind_attrs->{$col} = {};
1813       }
1814       else {
1815         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1816       }
1817     }
1818   }
1819
1820   # adjust limits
1821   if (
1822     $attrs->{software_limit}
1823       ||
1824     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1825   ) {
1826     $attrs->{software_limit} = 1;
1827   }
1828   else {
1829     $self->throw_exception("rows attribute must be positive if present")
1830       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1831
1832     # MySQL actually recommends this approach.  I cringe.
1833     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1834   }
1835
1836   my @limit;
1837
1838   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1839   # storage, unless software limit was requested
1840   if (
1841     #limited has_many
1842     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1843        ||
1844     # limited prefetch with RNO subqueries
1845     (
1846       $attrs->{rows}
1847         &&
1848       $sql_maker->limit_dialect eq 'RowNumberOver'
1849         &&
1850       $attrs->{_prefetch_select}
1851         &&
1852       @{$attrs->{_prefetch_select}}
1853     )
1854       ||
1855     # grouped prefetch
1856     ( $attrs->{group_by}
1857         &&
1858       @{$attrs->{group_by}}
1859         &&
1860       $attrs->{_prefetch_select}
1861         &&
1862       @{$attrs->{_prefetch_select}}
1863     )
1864   ) {
1865     ($ident, $select, $where, $attrs)
1866       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1867   }
1868
1869   elsif (
1870     ($attrs->{rows} || $attrs->{offset})
1871       &&
1872     $sql_maker->limit_dialect eq 'RowNumberOver'
1873       &&
1874     (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
1875       &&
1876     scalar $self->_parse_order_by ($attrs->{order_by})
1877   ) {
1878     # the RNO limit dialect above mangles the SQL such that the join gets lost
1879     # wrap a subquery here
1880
1881     push @limit, delete @{$attrs}{qw/rows offset/};
1882
1883     my $subq = $self->_select_args_to_query (
1884       $ident,
1885       $select,
1886       $where,
1887       $attrs,
1888     );
1889
1890     $ident = {
1891       -alias => $attrs->{alias},
1892       -source_handle => $ident->[0]{-source_handle},
1893       $attrs->{alias} => $subq,
1894     };
1895
1896     # all part of the subquery now
1897     delete @{$attrs}{qw/order_by group_by having/};
1898     $where = undef;
1899   }
1900
1901   elsif (! $attrs->{software_limit} ) {
1902     push @limit, $attrs->{rows}, $attrs->{offset};
1903   }
1904
1905   # try to simplify the joinmap further (prune unreferenced type-single joins)
1906   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1907
1908 ###
1909   # This would be the point to deflate anything found in $where
1910   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1911   # expect a row object. And all we have is a resultsource (it is trivial
1912   # to extract deflator coderefs via $alias2source above).
1913   #
1914   # I don't see a way forward other than changing the way deflators are
1915   # invoked, and that's just bad...
1916 ###
1917
1918   my $order = { map
1919     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1920     (qw/order_by group_by having/ )
1921   };
1922
1923   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1924 }
1925
1926 # Returns a counting SELECT for a simple count
1927 # query. Abstracted so that a storage could override
1928 # this to { count => 'firstcol' } or whatever makes
1929 # sense as a performance optimization
1930 sub _count_select {
1931   #my ($self, $source, $rs_attrs) = @_;
1932   return { count => '*' };
1933 }
1934
1935 # Returns a SELECT which will end up in the subselect
1936 # There may or may not be a group_by, as the subquery
1937 # might have been called to accomodate a limit
1938 #
1939 # Most databases would be happy with whatever ends up
1940 # here, but some choke in various ways.
1941 #
1942 sub _subq_count_select {
1943   my ($self, $source, $rs_attrs) = @_;
1944
1945   if (my $groupby = $rs_attrs->{group_by}) {
1946
1947     my $avail_columns = $self->_resolve_column_info ($rs_attrs->{from});
1948
1949     my $sel_index;
1950     for my $sel (@{$rs_attrs->{select}}) {
1951       if (ref $sel eq 'HASH' and $sel->{-as}) {
1952         $sel_index->{$sel->{-as}} = $sel;
1953       }
1954     }
1955
1956     my @selection;
1957     for my $g_part (@$groupby) {
1958       if (ref $g_part or $avail_columns->{$g_part}) {
1959         push @selection, $g_part;
1960       }
1961       elsif ($sel_index->{$g_part}) {
1962         push @selection, $sel_index->{$g_part};
1963       }
1964       else {
1965         $self->throw_exception ("group_by criteria '$g_part' not contained within current resultset source(s)");
1966       }
1967     }
1968
1969     return \@selection;
1970   }
1971
1972   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1973   return @pcols ? \@pcols : [ 1 ];
1974 }
1975
1976 sub source_bind_attributes {
1977   my ($self, $source) = @_;
1978
1979   my $bind_attributes;
1980   foreach my $column ($source->columns) {
1981
1982     my $data_type = $source->column_info($column)->{data_type} || '';
1983     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1984      if $data_type;
1985   }
1986
1987   return $bind_attributes;
1988 }
1989
1990 =head2 select
1991
1992 =over 4
1993
1994 =item Arguments: $ident, $select, $condition, $attrs
1995
1996 =back
1997
1998 Handle a SQL select statement.
1999
2000 =cut
2001
2002 sub select {
2003   my $self = shift;
2004   my ($ident, $select, $condition, $attrs) = @_;
2005   return $self->cursor_class->new($self, \@_, $attrs);
2006 }
2007
2008 sub select_single {
2009   my $self = shift;
2010   my ($rv, $sth, @bind) = $self->_select(@_);
2011   my @row = $sth->fetchrow_array;
2012   my @nextrow = $sth->fetchrow_array if @row;
2013   if(@row && @nextrow) {
2014     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2015   }
2016   # Need to call finish() to work round broken DBDs
2017   $sth->finish();
2018   return @row;
2019 }
2020
2021 =head2 sth
2022
2023 =over 4
2024
2025 =item Arguments: $sql
2026
2027 =back
2028
2029 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2030
2031 =cut
2032
2033 sub _dbh_sth {
2034   my ($self, $dbh, $sql) = @_;
2035
2036   # 3 is the if_active parameter which avoids active sth re-use
2037   my $sth = $self->disable_sth_caching
2038     ? $dbh->prepare($sql)
2039     : $dbh->prepare_cached($sql, {}, 3);
2040
2041   # XXX You would think RaiseError would make this impossible,
2042   #  but apparently that's not true :(
2043   $self->throw_exception($dbh->errstr) if !$sth;
2044
2045   $sth;
2046 }
2047
2048 sub sth {
2049   my ($self, $sql) = @_;
2050   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2051 }
2052
2053 sub _dbh_columns_info_for {
2054   my ($self, $dbh, $table) = @_;
2055
2056   if ($dbh->can('column_info')) {
2057     my %result;
2058     eval {
2059       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2060       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2061       $sth->execute();
2062       while ( my $info = $sth->fetchrow_hashref() ){
2063         my %column_info;
2064         $column_info{data_type}   = $info->{TYPE_NAME};
2065         $column_info{size}      = $info->{COLUMN_SIZE};
2066         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2067         $column_info{default_value} = $info->{COLUMN_DEF};
2068         my $col_name = $info->{COLUMN_NAME};
2069         $col_name =~ s/^\"(.*)\"$/$1/;
2070
2071         $result{$col_name} = \%column_info;
2072       }
2073     };
2074     return \%result if !$@ && scalar keys %result;
2075   }
2076
2077   my %result;
2078   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2079   $sth->execute;
2080   my @columns = @{$sth->{NAME_lc}};
2081   for my $i ( 0 .. $#columns ){
2082     my %column_info;
2083     $column_info{data_type} = $sth->{TYPE}->[$i];
2084     $column_info{size} = $sth->{PRECISION}->[$i];
2085     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2086
2087     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2088       $column_info{data_type} = $1;
2089       $column_info{size}    = $2;
2090     }
2091
2092     $result{$columns[$i]} = \%column_info;
2093   }
2094   $sth->finish;
2095
2096   foreach my $col (keys %result) {
2097     my $colinfo = $result{$col};
2098     my $type_num = $colinfo->{data_type};
2099     my $type_name;
2100     if(defined $type_num && $dbh->can('type_info')) {
2101       my $type_info = $dbh->type_info($type_num);
2102       $type_name = $type_info->{TYPE_NAME} if $type_info;
2103       $colinfo->{data_type} = $type_name if $type_name;
2104     }
2105   }
2106
2107   return \%result;
2108 }
2109
2110 sub columns_info_for {
2111   my ($self, $table) = @_;
2112   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2113 }
2114
2115 =head2 last_insert_id
2116
2117 Return the row id of the last insert.
2118
2119 =cut
2120
2121 sub _dbh_last_insert_id {
2122     my ($self, $dbh, $source, $col) = @_;
2123
2124     my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2125
2126     return $id if defined $id;
2127
2128     my $class = ref $self;
2129     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2130 }
2131
2132 sub last_insert_id {
2133   my $self = shift;
2134   $self->_dbh_last_insert_id ($self->_dbh, @_);
2135 }
2136
2137 =head2 _native_data_type
2138
2139 =over 4
2140
2141 =item Arguments: $type_name
2142
2143 =back
2144
2145 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2146 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2147 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2148
2149 The default implementation returns C<undef>, implement in your Storage driver if
2150 you need this functionality.
2151
2152 Should map types from other databases to the native RDBMS type, for example
2153 C<VARCHAR2> to C<VARCHAR>.
2154
2155 Types with modifiers should map to the underlying data type. For example,
2156 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2157
2158 Composite types should map to the container type, for example
2159 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2160
2161 =cut
2162
2163 sub _native_data_type {
2164   #my ($self, $data_type) = @_;
2165   return undef
2166 }
2167
2168 # Check if placeholders are supported at all
2169 sub _placeholders_supported {
2170   my $self = shift;
2171   my $dbh  = $self->_get_dbh;
2172
2173   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2174   # but it is inaccurate more often than not
2175   eval {
2176     local $dbh->{PrintError} = 0;
2177     local $dbh->{RaiseError} = 1;
2178     $dbh->do('select ?', {}, 1);
2179   };
2180   return $@ ? 0 : 1;
2181 }
2182
2183 # Check if placeholders bound to non-string types throw exceptions
2184 #
2185 sub _typeless_placeholders_supported {
2186   my $self = shift;
2187   my $dbh  = $self->_get_dbh;
2188
2189   eval {
2190     local $dbh->{PrintError} = 0;
2191     local $dbh->{RaiseError} = 1;
2192     # this specifically tests a bind that is NOT a string
2193     $dbh->do('select 1 where 1 = ?', {}, 1);
2194   };
2195   return $@ ? 0 : 1;
2196 }
2197
2198 =head2 sqlt_type
2199
2200 Returns the database driver name.
2201
2202 =cut
2203
2204 sub sqlt_type {
2205   shift->_get_dbh->{Driver}->{Name};
2206 }
2207
2208 =head2 bind_attribute_by_data_type
2209
2210 Given a datatype from column info, returns a database specific bind
2211 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2212 let the database planner just handle it.
2213
2214 Generally only needed for special case column types, like bytea in postgres.
2215
2216 =cut
2217
2218 sub bind_attribute_by_data_type {
2219     return;
2220 }
2221
2222 =head2 is_datatype_numeric
2223
2224 Given a datatype from column_info, returns a boolean value indicating if
2225 the current RDBMS considers it a numeric value. This controls how
2226 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2227 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2228 be performed instead of the usual C<eq>.
2229
2230 =cut
2231
2232 sub is_datatype_numeric {
2233   my ($self, $dt) = @_;
2234
2235   return 0 unless $dt;
2236
2237   return $dt =~ /^ (?:
2238     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2239   ) $/ix;
2240 }
2241
2242
2243 =head2 create_ddl_dir
2244
2245 =over 4
2246
2247 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2248
2249 =back
2250
2251 Creates a SQL file based on the Schema, for each of the specified
2252 database engines in C<\@databases> in the given directory.
2253 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2254
2255 Given a previous version number, this will also create a file containing
2256 the ALTER TABLE statements to transform the previous schema into the
2257 current one. Note that these statements may contain C<DROP TABLE> or
2258 C<DROP COLUMN> statements that can potentially destroy data.
2259
2260 The file names are created using the C<ddl_filename> method below, please
2261 override this method in your schema if you would like a different file
2262 name format. For the ALTER file, the same format is used, replacing
2263 $version in the name with "$preversion-$version".
2264
2265 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2266 The most common value for this would be C<< { add_drop_table => 1 } >>
2267 to have the SQL produced include a C<DROP TABLE> statement for each table
2268 created. For quoting purposes supply C<quote_table_names> and
2269 C<quote_field_names>.
2270
2271 If no arguments are passed, then the following default values are assumed:
2272
2273 =over 4
2274
2275 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2276
2277 =item version    - $schema->schema_version
2278
2279 =item directory  - './'
2280
2281 =item preversion - <none>
2282
2283 =back
2284
2285 By default, C<\%sqlt_args> will have
2286
2287  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2288
2289 merged with the hash passed in. To disable any of those features, pass in a
2290 hashref like the following
2291
2292  { ignore_constraint_names => 0, # ... other options }
2293
2294
2295 WARNING: You are strongly advised to check all SQL files created, before applying
2296 them.
2297
2298 =cut
2299
2300 sub create_ddl_dir {
2301   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2302
2303   unless ($dir) {
2304     carp "No directory given, using ./\n";
2305     $dir = './';
2306   }
2307
2308   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2309
2310   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2311   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2312
2313   my $schema_version = $schema->schema_version || '1.x';
2314   $version ||= $schema_version;
2315
2316   $sqltargs = {
2317     add_drop_table => 1,
2318     ignore_constraint_names => 1,
2319     ignore_index_names => 1,
2320     %{$sqltargs || {}}
2321   };
2322
2323   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2324     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2325   }
2326
2327   my $sqlt = SQL::Translator->new( $sqltargs );
2328
2329   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2330   my $sqlt_schema = $sqlt->translate({ data => $schema })
2331     or $self->throw_exception ($sqlt->error);
2332
2333   foreach my $db (@$databases) {
2334     $sqlt->reset();
2335     $sqlt->{schema} = $sqlt_schema;
2336     $sqlt->producer($db);
2337
2338     my $file;
2339     my $filename = $schema->ddl_filename($db, $version, $dir);
2340     if (-e $filename && ($version eq $schema_version )) {
2341       # if we are dumping the current version, overwrite the DDL
2342       carp "Overwriting existing DDL file - $filename";
2343       unlink($filename);
2344     }
2345
2346     my $output = $sqlt->translate;
2347     if(!$output) {
2348       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2349       next;
2350     }
2351     if(!open($file, ">$filename")) {
2352       $self->throw_exception("Can't open $filename for writing ($!)");
2353       next;
2354     }
2355     print $file $output;
2356     close($file);
2357
2358     next unless ($preversion);
2359
2360     require SQL::Translator::Diff;
2361
2362     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2363     if(!-e $prefilename) {
2364       carp("No previous schema file found ($prefilename)");
2365       next;
2366     }
2367
2368     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2369     if(-e $difffile) {
2370       carp("Overwriting existing diff file - $difffile");
2371       unlink($difffile);
2372     }
2373
2374     my $source_schema;
2375     {
2376       my $t = SQL::Translator->new($sqltargs);
2377       $t->debug( 0 );
2378       $t->trace( 0 );
2379
2380       $t->parser( $db )
2381         or $self->throw_exception ($t->error);
2382
2383       my $out = $t->translate( $prefilename )
2384         or $self->throw_exception ($t->error);
2385
2386       $source_schema = $t->schema;
2387
2388       $source_schema->name( $prefilename )
2389         unless ( $source_schema->name );
2390     }
2391
2392     # The "new" style of producers have sane normalization and can support
2393     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2394     # And we have to diff parsed SQL against parsed SQL.
2395     my $dest_schema = $sqlt_schema;
2396
2397     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2398       my $t = SQL::Translator->new($sqltargs);
2399       $t->debug( 0 );
2400       $t->trace( 0 );
2401
2402       $t->parser( $db )
2403         or $self->throw_exception ($t->error);
2404
2405       my $out = $t->translate( $filename )
2406         or $self->throw_exception ($t->error);
2407
2408       $dest_schema = $t->schema;
2409
2410       $dest_schema->name( $filename )
2411         unless $dest_schema->name;
2412     }
2413
2414     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2415                                                   $dest_schema,   $db,
2416                                                   $sqltargs
2417                                                  );
2418     if(!open $file, ">$difffile") {
2419       $self->throw_exception("Can't write to $difffile ($!)");
2420       next;
2421     }
2422     print $file $diff;
2423     close($file);
2424   }
2425 }
2426
2427 =head2 deployment_statements
2428
2429 =over 4
2430
2431 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2432
2433 =back
2434
2435 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2436
2437 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2438 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2439
2440 C<$directory> is used to return statements from files in a previously created
2441 L</create_ddl_dir> directory and is optional. The filenames are constructed
2442 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2443
2444 If no C<$directory> is specified then the statements are constructed on the
2445 fly using L<SQL::Translator> and C<$version> is ignored.
2446
2447 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2448
2449 =cut
2450
2451 sub deployment_statements {
2452   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2453   $type ||= $self->sqlt_type;
2454   $version ||= $schema->schema_version || '1.x';
2455   $dir ||= './';
2456   my $filename = $schema->ddl_filename($type, $version, $dir);
2457   if(-f $filename)
2458   {
2459       my $file;
2460       open($file, "<$filename")
2461         or $self->throw_exception("Can't open $filename ($!)");
2462       my @rows = <$file>;
2463       close($file);
2464       return join('', @rows);
2465   }
2466
2467   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2468     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2469   }
2470
2471   # sources needs to be a parser arg, but for simplicty allow at top level
2472   # coming in
2473   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2474       if exists $sqltargs->{sources};
2475
2476   my $tr = SQL::Translator->new(
2477     producer => "SQL::Translator::Producer::${type}",
2478     %$sqltargs,
2479     parser => 'SQL::Translator::Parser::DBIx::Class',
2480     data => $schema,
2481   );
2482
2483   my @ret;
2484   my $wa = wantarray;
2485   if ($wa) {
2486     @ret = $tr->translate;
2487   }
2488   else {
2489     $ret[0] = $tr->translate;
2490   }
2491
2492   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2493     unless (@ret && defined $ret[0]);
2494
2495   return $wa ? @ret : $ret[0];
2496 }
2497
2498 sub deploy {
2499   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2500   my $deploy = sub {
2501     my $line = shift;
2502     return if($line =~ /^--/);
2503     return if(!$line);
2504     # next if($line =~ /^DROP/m);
2505     return if($line =~ /^BEGIN TRANSACTION/m);
2506     return if($line =~ /^COMMIT/m);
2507     return if $line =~ /^\s+$/; # skip whitespace only
2508     $self->_query_start($line);
2509     eval {
2510       # do a dbh_do cycle here, as we need some error checking in
2511       # place (even though we will ignore errors)
2512       $self->dbh_do (sub { $_[1]->do($line) });
2513     };
2514     if ($@) {
2515       carp qq{$@ (running "${line}")};
2516     }
2517     $self->_query_end($line);
2518   };
2519   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2520   if (@statements > 1) {
2521     foreach my $statement (@statements) {
2522       $deploy->( $statement );
2523     }
2524   }
2525   elsif (@statements == 1) {
2526     foreach my $line ( split(";\n", $statements[0])) {
2527       $deploy->( $line );
2528     }
2529   }
2530 }
2531
2532 =head2 datetime_parser
2533
2534 Returns the datetime parser class
2535
2536 =cut
2537
2538 sub datetime_parser {
2539   my $self = shift;
2540   return $self->{datetime_parser} ||= do {
2541     $self->build_datetime_parser(@_);
2542   };
2543 }
2544
2545 =head2 datetime_parser_type
2546
2547 Defines (returns) the datetime parser class - currently hardwired to
2548 L<DateTime::Format::MySQL>
2549
2550 =cut
2551
2552 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2553
2554 =head2 build_datetime_parser
2555
2556 See L</datetime_parser>
2557
2558 =cut
2559
2560 sub build_datetime_parser {
2561   my $self = shift;
2562   my $type = $self->datetime_parser_type(@_);
2563   $self->ensure_class_loaded ($type);
2564   return $type;
2565 }
2566
2567
2568 =head2 is_replicating
2569
2570 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2571 replicate from a master database.  Default is undef, which is the result
2572 returned by databases that don't support replication.
2573
2574 =cut
2575
2576 sub is_replicating {
2577     return;
2578
2579 }
2580
2581 =head2 lag_behind_master
2582
2583 Returns a number that represents a certain amount of lag behind a master db
2584 when a given storage is replicating.  The number is database dependent, but
2585 starts at zero and increases with the amount of lag. Default in undef
2586
2587 =cut
2588
2589 sub lag_behind_master {
2590     return;
2591 }
2592
2593 =head2 relname_to_table_alias
2594
2595 =over 4
2596
2597 =item Arguments: $relname, $join_count
2598
2599 =back
2600
2601 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2602 queries.
2603
2604 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2605 way these aliases are named.
2606
2607 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2608 otherwise C<"$relname">.
2609
2610 =cut
2611
2612 sub relname_to_table_alias {
2613   my ($self, $relname, $join_count) = @_;
2614
2615   my $alias = ($join_count && $join_count > 1 ?
2616     join('_', $relname, $join_count) : $relname);
2617
2618   return $alias;
2619 }
2620
2621 sub DESTROY {
2622   my $self = shift;
2623
2624   $self->_verify_pid if $self->_dbh;
2625
2626   # some databases need this to stop spewing warnings
2627   if (my $dbh = $self->_dbh) {
2628     local $@;
2629     eval {
2630       %{ $dbh->{CachedKids} } = ();
2631       $dbh->disconnect;
2632     };
2633   }
2634
2635   $self->_dbh(undef);
2636 }
2637
2638 1;
2639
2640 =head1 USAGE NOTES
2641
2642 =head2 DBIx::Class and AutoCommit
2643
2644 DBIx::Class can do some wonderful magic with handling exceptions,
2645 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2646 (the default) combined with C<txn_do> for transaction support.
2647
2648 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2649 in an assumed transaction between commits, and you're telling us you'd
2650 like to manage that manually.  A lot of the magic protections offered by
2651 this module will go away.  We can't protect you from exceptions due to database
2652 disconnects because we don't know anything about how to restart your
2653 transactions.  You're on your own for handling all sorts of exceptional
2654 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2655 be with raw DBI.
2656
2657
2658 =head1 AUTHORS
2659
2660 Matt S. Trout <mst@shadowcatsystems.co.uk>
2661
2662 Andy Grundman <andy@hybridized.org>
2663
2664 =head1 LICENSE
2665
2666 You may distribute this code under the same terms as Perl itself.
2667
2668 =cut