missing local
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18
19 __PACKAGE__->mk_group_accessors('simple' =>
20   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
21      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints
22      __server_info/
23 );
24
25 # the values for these accessors are picked out (and deleted) from
26 # the attribute hashref passed to connect_info
27 my @storage_options = qw/
28   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
29   disable_sth_caching unsafe auto_savepoint
30 /;
31 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
32
33
34 # default cursor class, overridable in connect_info attributes
35 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
36
37 __PACKAGE__->mk_group_accessors('inherited' => qw/
38   sql_maker_class
39   can_insert_returning
40 /);
41 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
42
43
44 # Each of these methods need _determine_driver called before itself
45 # in order to function reliably. This is a purely DRY optimization
46 my @rdbms_specific_methods = qw/
47   deployment_statements
48   sqlt_type
49   build_datetime_parser
50   datetime_parser_type
51
52   insert
53   insert_bulk
54   update
55   delete
56   select
57   select_single
58 /;
59
60 for my $meth (@rdbms_specific_methods) {
61
62   my $orig = __PACKAGE__->can ($meth)
63     or next;
64
65   no strict qw/refs/;
66   no warnings qw/redefine/;
67   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
68     if (not $_[0]->_driver_determined) {
69       $_[0]->_determine_driver;
70       goto $_[0]->can($meth);
71     }
72     $orig->(@_);
73   };
74 }
75
76
77 =head1 NAME
78
79 DBIx::Class::Storage::DBI - DBI storage handler
80
81 =head1 SYNOPSIS
82
83   my $schema = MySchema->connect('dbi:SQLite:my.db');
84
85   $schema->storage->debug(1);
86
87   my @stuff = $schema->storage->dbh_do(
88     sub {
89       my ($storage, $dbh, @args) = @_;
90       $dbh->do("DROP TABLE authors");
91     },
92     @column_list
93   );
94
95   $schema->resultset('Book')->search({
96      written_on => $schema->storage->datetime_parser(DateTime->now)
97   });
98
99 =head1 DESCRIPTION
100
101 This class represents the connection to an RDBMS via L<DBI>.  See
102 L<DBIx::Class::Storage> for general information.  This pod only
103 documents DBI-specific methods and behaviors.
104
105 =head1 METHODS
106
107 =cut
108
109 sub new {
110   my $new = shift->next::method(@_);
111
112   $new->transaction_depth(0);
113   $new->_sql_maker_opts({});
114   $new->{savepoints} = [];
115   $new->{_in_dbh_do} = 0;
116   $new->{_dbh_gen} = 0;
117
118   $new;
119 }
120
121 =head2 connect_info
122
123 This method is normally called by L<DBIx::Class::Schema/connection>, which
124 encapsulates its argument list in an arrayref before passing them here.
125
126 The argument list may contain:
127
128 =over
129
130 =item *
131
132 The same 4-element argument set one would normally pass to
133 L<DBI/connect>, optionally followed by
134 L<extra attributes|/DBIx::Class specific connection attributes>
135 recognized by DBIx::Class:
136
137   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
138
139 =item *
140
141 A single code reference which returns a connected
142 L<DBI database handle|DBI/connect> optionally followed by
143 L<extra attributes|/DBIx::Class specific connection attributes> recognized
144 by DBIx::Class:
145
146   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
147
148 =item *
149
150 A single hashref with all the attributes and the dsn/user/password
151 mixed together:
152
153   $connect_info_args = [{
154     dsn => $dsn,
155     user => $user,
156     password => $pass,
157     %dbi_attributes,
158     %extra_attributes,
159   }];
160
161   $connect_info_args = [{
162     dbh_maker => sub { DBI->connect (...) },
163     %dbi_attributes,
164     %extra_attributes,
165   }];
166
167 This is particularly useful for L<Catalyst> based applications, allowing the
168 following config (L<Config::General> style):
169
170   <Model::DB>
171     schema_class   App::DB
172     <connect_info>
173       dsn          dbi:mysql:database=test
174       user         testuser
175       password     TestPass
176       AutoCommit   1
177     </connect_info>
178   </Model::DB>
179
180 The C<dsn>/C<user>/C<password> combination can be substituted by the
181 C<dbh_maker> key whose value is a coderef that returns a connected
182 L<DBI database handle|DBI/connect>
183
184 =back
185
186 Please note that the L<DBI> docs recommend that you always explicitly
187 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
188 recommends that it be set to I<1>, and that you perform transactions
189 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
190 to I<1> if you do not do explicitly set it to zero.  This is the default
191 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
192
193 =head3 DBIx::Class specific connection attributes
194
195 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
196 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
197 the following connection options. These options can be mixed in with your other
198 L<DBI> connection attributes, or placed in a separate hashref
199 (C<\%extra_attributes>) as shown above.
200
201 Every time C<connect_info> is invoked, any previous settings for
202 these options will be cleared before setting the new ones, regardless of
203 whether any options are specified in the new C<connect_info>.
204
205
206 =over
207
208 =item on_connect_do
209
210 Specifies things to do immediately after connecting or re-connecting to
211 the database.  Its value may contain:
212
213 =over
214
215 =item a scalar
216
217 This contains one SQL statement to execute.
218
219 =item an array reference
220
221 This contains SQL statements to execute in order.  Each element contains
222 a string or a code reference that returns a string.
223
224 =item a code reference
225
226 This contains some code to execute.  Unlike code references within an
227 array reference, its return value is ignored.
228
229 =back
230
231 =item on_disconnect_do
232
233 Takes arguments in the same form as L</on_connect_do> and executes them
234 immediately before disconnecting from the database.
235
236 Note, this only runs if you explicitly call L</disconnect> on the
237 storage object.
238
239 =item on_connect_call
240
241 A more generalized form of L</on_connect_do> that calls the specified
242 C<connect_call_METHOD> methods in your storage driver.
243
244   on_connect_do => 'select 1'
245
246 is equivalent to:
247
248   on_connect_call => [ [ do_sql => 'select 1' ] ]
249
250 Its values may contain:
251
252 =over
253
254 =item a scalar
255
256 Will call the C<connect_call_METHOD> method.
257
258 =item a code reference
259
260 Will execute C<< $code->($storage) >>
261
262 =item an array reference
263
264 Each value can be a method name or code reference.
265
266 =item an array of arrays
267
268 For each array, the first item is taken to be the C<connect_call_> method name
269 or code reference, and the rest are parameters to it.
270
271 =back
272
273 Some predefined storage methods you may use:
274
275 =over
276
277 =item do_sql
278
279 Executes a SQL string or a code reference that returns a SQL string. This is
280 what L</on_connect_do> and L</on_disconnect_do> use.
281
282 It can take:
283
284 =over
285
286 =item a scalar
287
288 Will execute the scalar as SQL.
289
290 =item an arrayref
291
292 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
293 attributes hashref and bind values.
294
295 =item a code reference
296
297 Will execute C<< $code->($storage) >> and execute the return array refs as
298 above.
299
300 =back
301
302 =item datetime_setup
303
304 Execute any statements necessary to initialize the database session to return
305 and accept datetime/timestamp values used with
306 L<DBIx::Class::InflateColumn::DateTime>.
307
308 Only necessary for some databases, see your specific storage driver for
309 implementation details.
310
311 =back
312
313 =item on_disconnect_call
314
315 Takes arguments in the same form as L</on_connect_call> and executes them
316 immediately before disconnecting from the database.
317
318 Calls the C<disconnect_call_METHOD> methods as opposed to the
319 C<connect_call_METHOD> methods called by L</on_connect_call>.
320
321 Note, this only runs if you explicitly call L</disconnect> on the
322 storage object.
323
324 =item disable_sth_caching
325
326 If set to a true value, this option will disable the caching of
327 statement handles via L<DBI/prepare_cached>.
328
329 =item limit_dialect
330
331 Sets the limit dialect. This is useful for JDBC-bridge among others
332 where the remote SQL-dialect cannot be determined by the name of the
333 driver alone. See also L<SQL::Abstract::Limit>.
334
335 =item quote_char
336
337 Specifies what characters to use to quote table and column names. If
338 you use this you will want to specify L</name_sep> as well.
339
340 C<quote_char> expects either a single character, in which case is it
341 is placed on either side of the table/column name, or an arrayref of length
342 2 in which case the table/column name is placed between the elements.
343
344 For example under MySQL you should use C<< quote_char => '`' >>, and for
345 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
346
347 =item name_sep
348
349 This only needs to be used in conjunction with C<quote_char>, and is used to
350 specify the character that separates elements (schemas, tables, columns) from
351 each other. In most cases this is simply a C<.>.
352
353 The consequences of not supplying this value is that L<SQL::Abstract>
354 will assume DBIx::Class' uses of aliases to be complete column
355 names. The output will look like I<"me.name"> when it should actually
356 be I<"me"."name">.
357
358 =item unsafe
359
360 This Storage driver normally installs its own C<HandleError>, sets
361 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
362 all database handles, including those supplied by a coderef.  It does this
363 so that it can have consistent and useful error behavior.
364
365 If you set this option to a true value, Storage will not do its usual
366 modifications to the database handle's attributes, and instead relies on
367 the settings in your connect_info DBI options (or the values you set in
368 your connection coderef, in the case that you are connecting via coderef).
369
370 Note that your custom settings can cause Storage to malfunction,
371 especially if you set a C<HandleError> handler that suppresses exceptions
372 and/or disable C<RaiseError>.
373
374 =item auto_savepoint
375
376 If this option is true, L<DBIx::Class> will use savepoints when nesting
377 transactions, making it possible to recover from failure in the inner
378 transaction without having to abort all outer transactions.
379
380 =item cursor_class
381
382 Use this argument to supply a cursor class other than the default
383 L<DBIx::Class::Storage::DBI::Cursor>.
384
385 =back
386
387 Some real-life examples of arguments to L</connect_info> and
388 L<DBIx::Class::Schema/connect>
389
390   # Simple SQLite connection
391   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
392
393   # Connect via subref
394   ->connect_info([ sub { DBI->connect(...) } ]);
395
396   # Connect via subref in hashref
397   ->connect_info([{
398     dbh_maker => sub { DBI->connect(...) },
399     on_connect_do => 'alter session ...',
400   }]);
401
402   # A bit more complicated
403   ->connect_info(
404     [
405       'dbi:Pg:dbname=foo',
406       'postgres',
407       'my_pg_password',
408       { AutoCommit => 1 },
409       { quote_char => q{"}, name_sep => q{.} },
410     ]
411   );
412
413   # Equivalent to the previous example
414   ->connect_info(
415     [
416       'dbi:Pg:dbname=foo',
417       'postgres',
418       'my_pg_password',
419       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
420     ]
421   );
422
423   # Same, but with hashref as argument
424   # See parse_connect_info for explanation
425   ->connect_info(
426     [{
427       dsn         => 'dbi:Pg:dbname=foo',
428       user        => 'postgres',
429       password    => 'my_pg_password',
430       AutoCommit  => 1,
431       quote_char  => q{"},
432       name_sep    => q{.},
433     }]
434   );
435
436   # Subref + DBIx::Class-specific connection options
437   ->connect_info(
438     [
439       sub { DBI->connect(...) },
440       {
441           quote_char => q{`},
442           name_sep => q{@},
443           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
444           disable_sth_caching => 1,
445       },
446     ]
447   );
448
449
450
451 =cut
452
453 sub connect_info {
454   my ($self, $info) = @_;
455
456   return $self->_connect_info if !$info;
457
458   $self->_connect_info($info); # copy for _connect_info
459
460   $info = $self->_normalize_connect_info($info)
461     if ref $info eq 'ARRAY';
462
463   for my $storage_opt (keys %{ $info->{storage_options} }) {
464     my $value = $info->{storage_options}{$storage_opt};
465
466     $self->$storage_opt($value);
467   }
468
469   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
470   #  the new set of options
471   $self->_sql_maker(undef);
472   $self->_sql_maker_opts({});
473
474   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
475     my $value = $info->{sql_maker_options}{$sql_maker_opt};
476
477     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
478   }
479
480   my %attrs = (
481     %{ $self->_default_dbi_connect_attributes || {} },
482     %{ $info->{attributes} || {} },
483   );
484
485   my @args = @{ $info->{arguments} };
486
487   $self->_dbi_connect_info([@args,
488     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
489
490   return $self->_connect_info;
491 }
492
493 sub _normalize_connect_info {
494   my ($self, $info_arg) = @_;
495   my %info;
496
497   my @args = @$info_arg;  # take a shallow copy for further mutilation
498
499   # combine/pre-parse arguments depending on invocation style
500
501   my %attrs;
502   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
503     %attrs = %{ $args[1] || {} };
504     @args = $args[0];
505   }
506   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
507     %attrs = %{$args[0]};
508     @args = ();
509     if (my $code = delete $attrs{dbh_maker}) {
510       @args = $code;
511
512       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
513       if (@ignored) {
514         carp sprintf (
515             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
516           . "to the result of 'dbh_maker'",
517
518           join (', ', map { "'$_'" } (@ignored) ),
519         );
520       }
521     }
522     else {
523       @args = delete @attrs{qw/dsn user password/};
524     }
525   }
526   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
527     %attrs = (
528       % { $args[3] || {} },
529       % { $args[4] || {} },
530     );
531     @args = @args[0,1,2];
532   }
533
534   $info{arguments} = \@args;
535
536   my @storage_opts = grep exists $attrs{$_},
537     @storage_options, 'cursor_class';
538
539   @{ $info{storage_options} }{@storage_opts} =
540     delete @attrs{@storage_opts} if @storage_opts;
541
542   my @sql_maker_opts = grep exists $attrs{$_},
543     qw/limit_dialect quote_char name_sep/;
544
545   @{ $info{sql_maker_options} }{@sql_maker_opts} =
546     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
547
548   $info{attributes} = \%attrs if %attrs;
549
550   return \%info;
551 }
552
553 sub _default_dbi_connect_attributes {
554   return {
555     AutoCommit => 1,
556     RaiseError => 1,
557     PrintError => 0,
558   };
559 }
560
561 =head2 on_connect_do
562
563 This method is deprecated in favour of setting via L</connect_info>.
564
565 =cut
566
567 =head2 on_disconnect_do
568
569 This method is deprecated in favour of setting via L</connect_info>.
570
571 =cut
572
573 sub _parse_connect_do {
574   my ($self, $type) = @_;
575
576   my $val = $self->$type;
577   return () if not defined $val;
578
579   my @res;
580
581   if (not ref($val)) {
582     push @res, [ 'do_sql', $val ];
583   } elsif (ref($val) eq 'CODE') {
584     push @res, $val;
585   } elsif (ref($val) eq 'ARRAY') {
586     push @res, map { [ 'do_sql', $_ ] } @$val;
587   } else {
588     $self->throw_exception("Invalid type for $type: ".ref($val));
589   }
590
591   return \@res;
592 }
593
594 =head2 dbh_do
595
596 Arguments: ($subref | $method_name), @extra_coderef_args?
597
598 Execute the given $subref or $method_name using the new exception-based
599 connection management.
600
601 The first two arguments will be the storage object that C<dbh_do> was called
602 on and a database handle to use.  Any additional arguments will be passed
603 verbatim to the called subref as arguments 2 and onwards.
604
605 Using this (instead of $self->_dbh or $self->dbh) ensures correct
606 exception handling and reconnection (or failover in future subclasses).
607
608 Your subref should have no side-effects outside of the database, as
609 there is the potential for your subref to be partially double-executed
610 if the database connection was stale/dysfunctional.
611
612 Example:
613
614   my @stuff = $schema->storage->dbh_do(
615     sub {
616       my ($storage, $dbh, @cols) = @_;
617       my $cols = join(q{, }, @cols);
618       $dbh->selectrow_array("SELECT $cols FROM foo");
619     },
620     @column_list
621   );
622
623 =cut
624
625 sub dbh_do {
626   my $self = shift;
627   my $code = shift;
628
629   my $dbh = $self->_get_dbh;
630
631   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
632       || $self->{transaction_depth};
633
634   local $self->{_in_dbh_do} = 1;
635
636   my @result;
637   my $want_array = wantarray;
638
639   eval {
640
641     if($want_array) {
642         @result = $self->$code($dbh, @_);
643     }
644     elsif(defined $want_array) {
645         $result[0] = $self->$code($dbh, @_);
646     }
647     else {
648         $self->$code($dbh, @_);
649     }
650   };
651
652   # ->connected might unset $@ - copy
653   my $exception = $@;
654   if(!$exception) { return $want_array ? @result : $result[0] }
655
656   $self->throw_exception($exception) if $self->connected;
657
658   # We were not connected - reconnect and retry, but let any
659   #  exception fall right through this time
660   carp "Retrying $code after catching disconnected exception: $exception"
661     if $ENV{DBIC_DBIRETRY_DEBUG};
662   $self->_populate_dbh;
663   $self->$code($self->_dbh, @_);
664 }
665
666 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
667 # It also informs dbh_do to bypass itself while under the direction of txn_do,
668 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
669 sub txn_do {
670   my $self = shift;
671   my $coderef = shift;
672
673   ref $coderef eq 'CODE' or $self->throw_exception
674     ('$coderef must be a CODE reference');
675
676   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
677
678   local $self->{_in_dbh_do} = 1;
679
680   my @result;
681   my $want_array = wantarray;
682
683   my $tried = 0;
684   while(1) {
685     eval {
686       $self->_get_dbh;
687
688       $self->txn_begin;
689       if($want_array) {
690           @result = $coderef->(@_);
691       }
692       elsif(defined $want_array) {
693           $result[0] = $coderef->(@_);
694       }
695       else {
696           $coderef->(@_);
697       }
698       $self->txn_commit;
699     };
700
701     # ->connected might unset $@ - copy
702     my $exception = $@;
703     if(!$exception) { return $want_array ? @result : $result[0] }
704
705     if($tried++ || $self->connected) {
706       eval { $self->txn_rollback };
707       my $rollback_exception = $@;
708       if($rollback_exception) {
709         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
710         $self->throw_exception($exception)  # propagate nested rollback
711           if $rollback_exception =~ /$exception_class/;
712
713         $self->throw_exception(
714           "Transaction aborted: ${exception}. "
715           . "Rollback failed: ${rollback_exception}"
716         );
717       }
718       $self->throw_exception($exception)
719     }
720
721     # We were not connected, and was first try - reconnect and retry
722     # via the while loop
723     carp "Retrying $coderef after catching disconnected exception: $exception"
724       if $ENV{DBIC_DBIRETRY_DEBUG};
725     $self->_populate_dbh;
726   }
727 }
728
729 =head2 disconnect
730
731 Our C<disconnect> method also performs a rollback first if the
732 database is not in C<AutoCommit> mode.
733
734 =cut
735
736 sub disconnect {
737   my ($self) = @_;
738
739   if( $self->_dbh ) {
740     my @actions;
741
742     push @actions, ( $self->on_disconnect_call || () );
743     push @actions, $self->_parse_connect_do ('on_disconnect_do');
744
745     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
746
747     $self->_dbh_rollback unless $self->_dbh_autocommit;
748
749     %{ $self->_dbh->{CachedKids} } = ();
750     $self->_dbh->disconnect;
751     $self->_dbh(undef);
752     $self->{_dbh_gen}++;
753   }
754 }
755
756 =head2 with_deferred_fk_checks
757
758 =over 4
759
760 =item Arguments: C<$coderef>
761
762 =item Return Value: The return value of $coderef
763
764 =back
765
766 Storage specific method to run the code ref with FK checks deferred or
767 in MySQL's case disabled entirely.
768
769 =cut
770
771 # Storage subclasses should override this
772 sub with_deferred_fk_checks {
773   my ($self, $sub) = @_;
774   $sub->();
775 }
776
777 =head2 connected
778
779 =over
780
781 =item Arguments: none
782
783 =item Return Value: 1|0
784
785 =back
786
787 Verifies that the current database handle is active and ready to execute
788 an SQL statement (e.g. the connection did not get stale, server is still
789 answering, etc.) This method is used internally by L</dbh>.
790
791 =cut
792
793 sub connected {
794   my $self = shift;
795   return 0 unless $self->_seems_connected;
796
797   #be on the safe side
798   local $self->_dbh->{RaiseError} = 1;
799
800   return $self->_ping;
801 }
802
803 sub _seems_connected {
804   my $self = shift;
805
806   my $dbh = $self->_dbh
807     or return 0;
808
809   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
810     $self->_dbh(undef);
811     $self->{_dbh_gen}++;
812     return 0;
813   }
814   else {
815     $self->_verify_pid;
816     return 0 if !$self->_dbh;
817   }
818
819   return $dbh->FETCH('Active');
820 }
821
822 sub _ping {
823   my $self = shift;
824
825   my $dbh = $self->_dbh or return 0;
826
827   return $dbh->ping;
828 }
829
830 # handle pid changes correctly
831 #  NOTE: assumes $self->_dbh is a valid $dbh
832 sub _verify_pid {
833   my ($self) = @_;
834
835   return if defined $self->_conn_pid && $self->_conn_pid == $$;
836
837   $self->_dbh->{InactiveDestroy} = 1;
838   $self->_dbh(undef);
839   $self->{_dbh_gen}++;
840
841   return;
842 }
843
844 sub ensure_connected {
845   my ($self) = @_;
846
847   unless ($self->connected) {
848     $self->_populate_dbh;
849   }
850 }
851
852 =head2 dbh
853
854 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
855 is guaranteed to be healthy by implicitly calling L</connected>, and if
856 necessary performing a reconnection before returning. Keep in mind that this
857 is very B<expensive> on some database engines. Consider using L</dbh_do>
858 instead.
859
860 =cut
861
862 sub dbh {
863   my ($self) = @_;
864
865   if (not $self->_dbh) {
866     $self->_populate_dbh;
867   } else {
868     $self->ensure_connected;
869   }
870   return $self->_dbh;
871 }
872
873 # this is the internal "get dbh or connect (don't check)" method
874 sub _get_dbh {
875   my $self = shift;
876   $self->_verify_pid if $self->_dbh;
877   $self->_populate_dbh unless $self->_dbh;
878   return $self->_dbh;
879 }
880
881 sub _sql_maker_args {
882     my ($self) = @_;
883
884     return (
885       bindtype=>'columns',
886       array_datatypes => 1,
887       limit_dialect => $self->_get_dbh,
888       %{$self->_sql_maker_opts}
889     );
890 }
891
892 sub sql_maker {
893   my ($self) = @_;
894   unless ($self->_sql_maker) {
895     my $sql_maker_class = $self->sql_maker_class;
896     $self->ensure_class_loaded ($sql_maker_class);
897     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
898   }
899   return $self->_sql_maker;
900 }
901
902 # nothing to do by default
903 sub _rebless {}
904 sub _init {}
905
906 sub _populate_dbh {
907   my ($self) = @_;
908
909   my @info = @{$self->_dbi_connect_info || []};
910   $self->_dbh(undef); # in case ->connected failed we might get sent here
911   $self->_dbh($self->_connect(@info));
912
913   $self->_conn_pid($$);
914   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
915
916   $self->_determine_driver;
917
918   # Always set the transaction depth on connect, since
919   #  there is no transaction in progress by definition
920   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
921
922   $self->_run_connection_actions unless $self->{_in_determine_driver};
923
924   $self->_populate_server_info;
925 }
926
927 sub _run_connection_actions {
928   my $self = shift;
929   my @actions;
930
931   push @actions, ( $self->on_connect_call || () );
932   push @actions, $self->_parse_connect_do ('on_connect_do');
933
934   $self->_do_connection_actions(connect_call_ => $_) for @actions;
935 }
936
937 sub _populate_server_info {
938   my $self = shift;
939   my %info;
940
941   my $dbms_ver = eval {
942       local $@;
943       local $SIG{__WARN__} = sub {};
944       $self->_get_dbh->get_info(18)
945   };
946
947   if (defined $dbms_ver) {
948     $info{dbms_ver} = $dbms_ver;
949
950     ($dbms_ver) = $dbms_ver =~ /^(\S+)/;
951
952     my @verparts = split /\./, $dbms_ver;
953     $info{dbms_ver_normalized} = sprintf "%d.%03d%03d", @verparts;
954   }
955
956   $self->__server_info(\%info);
957
958   return \%info;
959 }
960
961 sub _server_info {
962   my $self = shift;
963
964   $self->_get_dbh;
965
966   return $self->__server_info(@_);
967 }
968
969 sub _determine_driver {
970   my ($self) = @_;
971
972   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
973     my $started_connected = 0;
974     local $self->{_in_determine_driver} = 1;
975
976     if (ref($self) eq __PACKAGE__) {
977       my $driver;
978       if ($self->_dbh) { # we are connected
979         $driver = $self->_dbh->{Driver}{Name};
980         $started_connected = 1;
981       } else {
982         # if connect_info is a CODEREF, we have no choice but to connect
983         if (ref $self->_dbi_connect_info->[0] &&
984             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
985           $self->_populate_dbh;
986           $driver = $self->_dbh->{Driver}{Name};
987         }
988         else {
989           # try to use dsn to not require being connected, the driver may still
990           # force a connection in _rebless to determine version
991           # (dsn may not be supplied at all if all we do is make a mock-schema)
992           my $dsn = $self->_dbi_connect_info->[0] || '';
993           ($driver) = $dsn =~ /dbi:([^:]+):/i;
994         }
995       }
996
997       if ($driver) {
998         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
999         if ($self->load_optional_class($storage_class)) {
1000           mro::set_mro($storage_class, 'c3');
1001           bless $self, $storage_class;
1002           $self->_rebless();
1003         }
1004       }
1005     }
1006
1007     $self->_driver_determined(1);
1008
1009     $self->_init; # run driver-specific initializations
1010
1011     $self->_run_connection_actions
1012         if !$started_connected && defined $self->_dbh;
1013   }
1014 }
1015
1016 sub _do_connection_actions {
1017   my $self          = shift;
1018   my $method_prefix = shift;
1019   my $call          = shift;
1020
1021   if (not ref($call)) {
1022     my $method = $method_prefix . $call;
1023     $self->$method(@_);
1024   } elsif (ref($call) eq 'CODE') {
1025     $self->$call(@_);
1026   } elsif (ref($call) eq 'ARRAY') {
1027     if (ref($call->[0]) ne 'ARRAY') {
1028       $self->_do_connection_actions($method_prefix, $_) for @$call;
1029     } else {
1030       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1031     }
1032   } else {
1033     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1034   }
1035
1036   return $self;
1037 }
1038
1039 sub connect_call_do_sql {
1040   my $self = shift;
1041   $self->_do_query(@_);
1042 }
1043
1044 sub disconnect_call_do_sql {
1045   my $self = shift;
1046   $self->_do_query(@_);
1047 }
1048
1049 # override in db-specific backend when necessary
1050 sub connect_call_datetime_setup { 1 }
1051
1052 sub _do_query {
1053   my ($self, $action) = @_;
1054
1055   if (ref $action eq 'CODE') {
1056     $action = $action->($self);
1057     $self->_do_query($_) foreach @$action;
1058   }
1059   else {
1060     # Most debuggers expect ($sql, @bind), so we need to exclude
1061     # the attribute hash which is the second argument to $dbh->do
1062     # furthermore the bind values are usually to be presented
1063     # as named arrayref pairs, so wrap those here too
1064     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1065     my $sql = shift @do_args;
1066     my $attrs = shift @do_args;
1067     my @bind = map { [ undef, $_ ] } @do_args;
1068
1069     $self->_query_start($sql, @bind);
1070     $self->_get_dbh->do($sql, $attrs, @do_args);
1071     $self->_query_end($sql, @bind);
1072   }
1073
1074   return $self;
1075 }
1076
1077 sub _connect {
1078   my ($self, @info) = @_;
1079
1080   $self->throw_exception("You failed to provide any connection info")
1081     if !@info;
1082
1083   my ($old_connect_via, $dbh);
1084
1085   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1086     $old_connect_via = $DBI::connect_via;
1087     $DBI::connect_via = 'connect';
1088   }
1089
1090   eval {
1091     if(ref $info[0] eq 'CODE') {
1092        $dbh = $info[0]->();
1093     }
1094     else {
1095        $dbh = DBI->connect(@info);
1096     }
1097
1098     if($dbh && !$self->unsafe) {
1099       my $weak_self = $self;
1100       Scalar::Util::weaken($weak_self);
1101       $dbh->{HandleError} = sub {
1102           if ($weak_self) {
1103             $weak_self->throw_exception("DBI Exception: $_[0]");
1104           }
1105           else {
1106             # the handler may be invoked by something totally out of
1107             # the scope of DBIC
1108             croak ("DBI Exception: $_[0]");
1109           }
1110       };
1111       $dbh->{ShowErrorStatement} = 1;
1112       $dbh->{RaiseError} = 1;
1113       $dbh->{PrintError} = 0;
1114     }
1115   };
1116
1117   $DBI::connect_via = $old_connect_via if $old_connect_via;
1118
1119   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1120     if !$dbh || $@;
1121
1122   $self->_dbh_autocommit($dbh->{AutoCommit});
1123
1124   $dbh;
1125 }
1126
1127 sub svp_begin {
1128   my ($self, $name) = @_;
1129
1130   $name = $self->_svp_generate_name
1131     unless defined $name;
1132
1133   $self->throw_exception ("You can't use savepoints outside a transaction")
1134     if $self->{transaction_depth} == 0;
1135
1136   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1137     unless $self->can('_svp_begin');
1138
1139   push @{ $self->{savepoints} }, $name;
1140
1141   $self->debugobj->svp_begin($name) if $self->debug;
1142
1143   return $self->_svp_begin($name);
1144 }
1145
1146 sub svp_release {
1147   my ($self, $name) = @_;
1148
1149   $self->throw_exception ("You can't use savepoints outside a transaction")
1150     if $self->{transaction_depth} == 0;
1151
1152   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1153     unless $self->can('_svp_release');
1154
1155   if (defined $name) {
1156     $self->throw_exception ("Savepoint '$name' does not exist")
1157       unless grep { $_ eq $name } @{ $self->{savepoints} };
1158
1159     # Dig through the stack until we find the one we are releasing.  This keeps
1160     # the stack up to date.
1161     my $svp;
1162
1163     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1164   } else {
1165     $name = pop @{ $self->{savepoints} };
1166   }
1167
1168   $self->debugobj->svp_release($name) if $self->debug;
1169
1170   return $self->_svp_release($name);
1171 }
1172
1173 sub svp_rollback {
1174   my ($self, $name) = @_;
1175
1176   $self->throw_exception ("You can't use savepoints outside a transaction")
1177     if $self->{transaction_depth} == 0;
1178
1179   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1180     unless $self->can('_svp_rollback');
1181
1182   if (defined $name) {
1183       # If they passed us a name, verify that it exists in the stack
1184       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1185           $self->throw_exception("Savepoint '$name' does not exist!");
1186       }
1187
1188       # Dig through the stack until we find the one we are releasing.  This keeps
1189       # the stack up to date.
1190       while(my $s = pop(@{ $self->{savepoints} })) {
1191           last if($s eq $name);
1192       }
1193       # Add the savepoint back to the stack, as a rollback doesn't remove the
1194       # named savepoint, only everything after it.
1195       push(@{ $self->{savepoints} }, $name);
1196   } else {
1197       # We'll assume they want to rollback to the last savepoint
1198       $name = $self->{savepoints}->[-1];
1199   }
1200
1201   $self->debugobj->svp_rollback($name) if $self->debug;
1202
1203   return $self->_svp_rollback($name);
1204 }
1205
1206 sub _svp_generate_name {
1207     my ($self) = @_;
1208
1209     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1210 }
1211
1212 sub txn_begin {
1213   my $self = shift;
1214
1215   # this means we have not yet connected and do not know the AC status
1216   # (e.g. coderef $dbh)
1217   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1218
1219   if($self->{transaction_depth} == 0) {
1220     $self->debugobj->txn_begin()
1221       if $self->debug;
1222     $self->_dbh_begin_work;
1223   }
1224   elsif ($self->auto_savepoint) {
1225     $self->svp_begin;
1226   }
1227   $self->{transaction_depth}++;
1228 }
1229
1230 sub _dbh_begin_work {
1231   my $self = shift;
1232
1233   # if the user is utilizing txn_do - good for him, otherwise we need to
1234   # ensure that the $dbh is healthy on BEGIN.
1235   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1236   # will be replaced by a failure of begin_work itself (which will be
1237   # then retried on reconnect)
1238   if ($self->{_in_dbh_do}) {
1239     $self->_dbh->begin_work;
1240   } else {
1241     $self->dbh_do(sub { $_[1]->begin_work });
1242   }
1243 }
1244
1245 sub txn_commit {
1246   my $self = shift;
1247   if ($self->{transaction_depth} == 1) {
1248     $self->debugobj->txn_commit()
1249       if ($self->debug);
1250     $self->_dbh_commit;
1251     $self->{transaction_depth} = 0
1252       if $self->_dbh_autocommit;
1253   }
1254   elsif($self->{transaction_depth} > 1) {
1255     $self->{transaction_depth}--;
1256     $self->svp_release
1257       if $self->auto_savepoint;
1258   }
1259 }
1260
1261 sub _dbh_commit {
1262   my $self = shift;
1263   my $dbh  = $self->_dbh
1264     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1265   $dbh->commit;
1266 }
1267
1268 sub txn_rollback {
1269   my $self = shift;
1270   my $dbh = $self->_dbh;
1271   eval {
1272     if ($self->{transaction_depth} == 1) {
1273       $self->debugobj->txn_rollback()
1274         if ($self->debug);
1275       $self->{transaction_depth} = 0
1276         if $self->_dbh_autocommit;
1277       $self->_dbh_rollback;
1278     }
1279     elsif($self->{transaction_depth} > 1) {
1280       $self->{transaction_depth}--;
1281       if ($self->auto_savepoint) {
1282         $self->svp_rollback;
1283         $self->svp_release;
1284       }
1285     }
1286     else {
1287       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1288     }
1289   };
1290   if ($@) {
1291     my $error = $@;
1292     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1293     $error =~ /$exception_class/ and $self->throw_exception($error);
1294     # ensure that a failed rollback resets the transaction depth
1295     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1296     $self->throw_exception($error);
1297   }
1298 }
1299
1300 sub _dbh_rollback {
1301   my $self = shift;
1302   my $dbh  = $self->_dbh
1303     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1304   $dbh->rollback;
1305 }
1306
1307 # This used to be the top-half of _execute.  It was split out to make it
1308 #  easier to override in NoBindVars without duping the rest.  It takes up
1309 #  all of _execute's args, and emits $sql, @bind.
1310 sub _prep_for_execute {
1311   my ($self, $op, $extra_bind, $ident, $args) = @_;
1312
1313   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1314     $ident = $ident->from();
1315   }
1316
1317   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1318
1319   unshift(@bind,
1320     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1321       if $extra_bind;
1322   return ($sql, \@bind);
1323 }
1324
1325
1326 sub _fix_bind_params {
1327     my ($self, @bind) = @_;
1328
1329     ### Turn @bind from something like this:
1330     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1331     ### to this:
1332     ###   ( "'1'", "'1'", "'3'" )
1333     return
1334         map {
1335             if ( defined( $_ && $_->[1] ) ) {
1336                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1337             }
1338             else { q{'NULL'}; }
1339         } @bind;
1340 }
1341
1342 sub _query_start {
1343     my ( $self, $sql, @bind ) = @_;
1344
1345     if ( $self->debug ) {
1346         @bind = $self->_fix_bind_params(@bind);
1347
1348         $self->debugobj->query_start( $sql, @bind );
1349     }
1350 }
1351
1352 sub _query_end {
1353     my ( $self, $sql, @bind ) = @_;
1354
1355     if ( $self->debug ) {
1356         @bind = $self->_fix_bind_params(@bind);
1357         $self->debugobj->query_end( $sql, @bind );
1358     }
1359 }
1360
1361 sub _dbh_execute {
1362   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1363
1364   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1365
1366   $self->_query_start( $sql, @$bind );
1367
1368   my $sth = $self->sth($sql,$op);
1369
1370   my $placeholder_index = 1;
1371
1372   foreach my $bound (@$bind) {
1373     my $attributes = {};
1374     my($column_name, @data) = @$bound;
1375
1376     if ($bind_attributes) {
1377       $attributes = $bind_attributes->{$column_name}
1378       if defined $bind_attributes->{$column_name};
1379     }
1380
1381     foreach my $data (@data) {
1382       my $ref = ref $data;
1383       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1384
1385       $sth->bind_param($placeholder_index, $data, $attributes);
1386       $placeholder_index++;
1387     }
1388   }
1389
1390   # Can this fail without throwing an exception anyways???
1391   my $rv = $sth->execute();
1392   $self->throw_exception($sth->errstr) if !$rv;
1393
1394   $self->_query_end( $sql, @$bind );
1395
1396   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1397 }
1398
1399 sub _execute {
1400     my $self = shift;
1401     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1402 }
1403
1404 sub _prefetch_insert_auto_nextvals {
1405   my ($self, $source, $to_insert) = @_;
1406
1407   my $upd = {};
1408
1409   foreach my $col ( $source->columns ) {
1410     if ( !defined $to_insert->{$col} ) {
1411       my $col_info = $source->column_info($col);
1412
1413       if ( $col_info->{auto_nextval} ) {
1414         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1415           'nextval',
1416           $col_info->{sequence} ||=
1417             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1418         );
1419       }
1420     }
1421   }
1422
1423   return $upd;
1424 }
1425
1426 sub insert {
1427   my $self = shift;
1428   my ($source, $to_insert, $opts) = @_;
1429
1430   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1431
1432   my $bind_attributes = $self->source_bind_attributes($source);
1433
1434   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1435
1436   if ($opts->{returning}) {
1437     my @ret_cols = @{$opts->{returning}};
1438
1439     my @ret_vals = eval {
1440       local $SIG{__WARN__} = sub {};
1441       my @r = $sth->fetchrow_array;
1442       $sth->finish;
1443       @r;
1444     };
1445
1446     my %ret;
1447     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1448
1449     $updated_cols = {
1450       %$updated_cols,
1451       %ret,
1452     };
1453   }
1454
1455   return $updated_cols;
1456 }
1457
1458 ## Currently it is assumed that all values passed will be "normal", i.e. not
1459 ## scalar refs, or at least, all the same type as the first set, the statement is
1460 ## only prepped once.
1461 sub insert_bulk {
1462   my ($self, $source, $cols, $data) = @_;
1463
1464   my %colvalues;
1465   @colvalues{@$cols} = (0..$#$cols);
1466
1467   for my $i (0..$#$cols) {
1468     my $first_val = $data->[0][$i];
1469     next unless ref $first_val eq 'SCALAR';
1470
1471     $colvalues{ $cols->[$i] } = $first_val;
1472   }
1473
1474   # check for bad data and stringify stringifiable objects
1475   my $bad_slice = sub {
1476     my ($msg, $col_idx, $slice_idx) = @_;
1477     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1478       $msg,
1479       $cols->[$col_idx],
1480       do {
1481         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1482         Data::Dumper::Concise::Dumper({
1483           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1484         }),
1485       }
1486     );
1487   };
1488
1489   for my $datum_idx (0..$#$data) {
1490     my $datum = $data->[$datum_idx];
1491
1492     for my $col_idx (0..$#$cols) {
1493       my $val            = $datum->[$col_idx];
1494       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1495       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1496
1497       if ($is_literal_sql) {
1498         if (not ref $val) {
1499           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1500         }
1501         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1502           $bad_slice->("$reftype reference found where literal SQL expected",
1503             $col_idx, $datum_idx);
1504         }
1505         elsif ($$val ne $$sqla_bind){
1506           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1507             $col_idx, $datum_idx);
1508         }
1509       }
1510       elsif (my $reftype = ref $val) {
1511         require overload;
1512         if (overload::Method($val, '""')) {
1513           $datum->[$col_idx] = "".$val;
1514         }
1515         else {
1516           $bad_slice->("$reftype reference found where bind expected",
1517             $col_idx, $datum_idx);
1518         }
1519       }
1520     }
1521   }
1522
1523   my ($sql, $bind) = $self->_prep_for_execute (
1524     'insert', undef, $source, [\%colvalues]
1525   );
1526   my @bind = @$bind;
1527
1528   my $empty_bind = 1 if (not @bind) &&
1529     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1530
1531   if ((not @bind) && (not $empty_bind)) {
1532     $self->throw_exception(
1533       'Cannot insert_bulk without support for placeholders'
1534     );
1535   }
1536
1537   # neither _execute_array, nor _execute_inserts_with_no_binds are
1538   # atomic (even if _execute _array is a single call). Thus a safety
1539   # scope guard
1540   my $guard = $self->txn_scope_guard;
1541
1542   $self->_query_start( $sql, ['__BULK__'] );
1543   my $sth = $self->sth($sql);
1544   my $rv = do {
1545     if ($empty_bind) {
1546       # bind_param_array doesn't work if there are no binds
1547       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1548     }
1549     else {
1550 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1551       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1552     }
1553   };
1554
1555   $self->_query_end( $sql, ['__BULK__'] );
1556
1557   $guard->commit;
1558
1559   return (wantarray ? ($rv, $sth, @bind) : $rv);
1560 }
1561
1562 sub _execute_array {
1563   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1564
1565   ## This must be an arrayref, else nothing works!
1566   my $tuple_status = [];
1567
1568   ## Get the bind_attributes, if any exist
1569   my $bind_attributes = $self->source_bind_attributes($source);
1570
1571   ## Bind the values and execute
1572   my $placeholder_index = 1;
1573
1574   foreach my $bound (@$bind) {
1575
1576     my $attributes = {};
1577     my ($column_name, $data_index) = @$bound;
1578
1579     if( $bind_attributes ) {
1580       $attributes = $bind_attributes->{$column_name}
1581       if defined $bind_attributes->{$column_name};
1582     }
1583
1584     my @data = map { $_->[$data_index] } @$data;
1585
1586     $sth->bind_param_array(
1587       $placeholder_index,
1588       [@data],
1589       (%$attributes ?  $attributes : ()),
1590     );
1591     $placeholder_index++;
1592   }
1593
1594   my $rv = eval {
1595     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1596   };
1597   my $err = $@ || $sth->errstr;
1598
1599 # Statement must finish even if there was an exception.
1600   eval { $sth->finish };
1601   $err = $@ unless $err;
1602
1603   if ($err) {
1604     my $i = 0;
1605     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1606
1607     $self->throw_exception("Unexpected populate error: $err")
1608       if ($i > $#$tuple_status);
1609
1610     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1611       ($tuple_status->[$i][1] || $err),
1612       Data::Dumper::Concise::Dumper({
1613         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1614       }),
1615     );
1616   }
1617   return $rv;
1618 }
1619
1620 sub _dbh_execute_array {
1621     my ($self, $sth, $tuple_status, @extra) = @_;
1622
1623     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1624 }
1625
1626 sub _dbh_execute_inserts_with_no_binds {
1627   my ($self, $sth, $count) = @_;
1628
1629   eval {
1630     my $dbh = $self->_get_dbh;
1631     local $dbh->{RaiseError} = 1;
1632     local $dbh->{PrintError} = 0;
1633
1634     $sth->execute foreach 1..$count;
1635   };
1636   my $exception = $@;
1637
1638 # Make sure statement is finished even if there was an exception.
1639   eval { $sth->finish };
1640   $exception = $@ unless $exception;
1641
1642   $self->throw_exception($exception) if $exception;
1643
1644   return $count;
1645 }
1646
1647 sub update {
1648   my ($self, $source, @args) = @_;
1649
1650   my $bind_attrs = $self->source_bind_attributes($source);
1651
1652   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1653 }
1654
1655
1656 sub delete {
1657   my ($self, $source, @args) = @_;
1658
1659   my $bind_attrs = $self->source_bind_attributes($source);
1660
1661   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1662 }
1663
1664 # We were sent here because the $rs contains a complex search
1665 # which will require a subquery to select the correct rows
1666 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1667 #
1668 # Generating a single PK column subquery is trivial and supported
1669 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1670 # Look at _multipk_update_delete()
1671 sub _subq_update_delete {
1672   my $self = shift;
1673   my ($rs, $op, $values) = @_;
1674
1675   my $rsrc = $rs->result_source;
1676
1677   # quick check if we got a sane rs on our hands
1678   my @pcols = $rsrc->_pri_cols;
1679
1680   my $sel = $rs->_resolved_attrs->{select};
1681   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1682
1683   if (
1684       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1685         ne
1686       join ("\x00", sort @$sel )
1687   ) {
1688     $self->throw_exception (
1689       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1690     );
1691   }
1692
1693   if (@pcols == 1) {
1694     return $self->$op (
1695       $rsrc,
1696       $op eq 'update' ? $values : (),
1697       { $pcols[0] => { -in => $rs->as_query } },
1698     );
1699   }
1700
1701   else {
1702     return $self->_multipk_update_delete (@_);
1703   }
1704 }
1705
1706 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1707 # resultset update/delete involving subqueries. So by default resort
1708 # to simple (and inefficient) delete_all style per-row opearations,
1709 # while allowing specific storages to override this with a faster
1710 # implementation.
1711 #
1712 sub _multipk_update_delete {
1713   return shift->_per_row_update_delete (@_);
1714 }
1715
1716 # This is the default loop used to delete/update rows for multi PK
1717 # resultsets, and used by mysql exclusively (because it can't do anything
1718 # else).
1719 #
1720 # We do not use $row->$op style queries, because resultset update/delete
1721 # is not expected to cascade (this is what delete_all/update_all is for).
1722 #
1723 # There should be no race conditions as the entire operation is rolled
1724 # in a transaction.
1725 #
1726 sub _per_row_update_delete {
1727   my $self = shift;
1728   my ($rs, $op, $values) = @_;
1729
1730   my $rsrc = $rs->result_source;
1731   my @pcols = $rsrc->_pri_cols;
1732
1733   my $guard = $self->txn_scope_guard;
1734
1735   # emulate the return value of $sth->execute for non-selects
1736   my $row_cnt = '0E0';
1737
1738   my $subrs_cur = $rs->cursor;
1739   my @all_pk = $subrs_cur->all;
1740   for my $pks ( @all_pk) {
1741
1742     my $cond;
1743     for my $i (0.. $#pcols) {
1744       $cond->{$pcols[$i]} = $pks->[$i];
1745     }
1746
1747     $self->$op (
1748       $rsrc,
1749       $op eq 'update' ? $values : (),
1750       $cond,
1751     );
1752
1753     $row_cnt++;
1754   }
1755
1756   $guard->commit;
1757
1758   return $row_cnt;
1759 }
1760
1761 sub _select {
1762   my $self = shift;
1763
1764   # localization is neccessary as
1765   # 1) there is no infrastructure to pass this around before SQLA2
1766   # 2) _select_args sets it and _prep_for_execute consumes it
1767   my $sql_maker = $self->sql_maker;
1768   local $sql_maker->{_dbic_rs_attrs};
1769
1770   return $self->_execute($self->_select_args(@_));
1771 }
1772
1773 sub _select_args_to_query {
1774   my $self = shift;
1775
1776   # localization is neccessary as
1777   # 1) there is no infrastructure to pass this around before SQLA2
1778   # 2) _select_args sets it and _prep_for_execute consumes it
1779   my $sql_maker = $self->sql_maker;
1780   local $sql_maker->{_dbic_rs_attrs};
1781
1782   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1783   #  = $self->_select_args($ident, $select, $cond, $attrs);
1784   my ($op, $bind, $ident, $bind_attrs, @args) =
1785     $self->_select_args(@_);
1786
1787   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1788   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1789   $prepared_bind ||= [];
1790
1791   return wantarray
1792     ? ($sql, $prepared_bind, $bind_attrs)
1793     : \[ "($sql)", @$prepared_bind ]
1794   ;
1795 }
1796
1797 sub _select_args {
1798   my ($self, $ident, $select, $where, $attrs) = @_;
1799
1800   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1801
1802   my $sql_maker = $self->sql_maker;
1803   $sql_maker->{_dbic_rs_attrs} = {
1804     %$attrs,
1805     select => $select,
1806     from => $ident,
1807     where => $where,
1808     $rs_alias && $alias2source->{$rs_alias}
1809       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1810       : ()
1811     ,
1812   };
1813
1814   # calculate bind_attrs before possible $ident mangling
1815   my $bind_attrs = {};
1816   for my $alias (keys %$alias2source) {
1817     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1818     for my $col (keys %$bindtypes) {
1819
1820       my $fqcn = join ('.', $alias, $col);
1821       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1822
1823       # Unqialified column names are nice, but at the same time can be
1824       # rather ambiguous. What we do here is basically go along with
1825       # the loop, adding an unqualified column slot to $bind_attrs,
1826       # alongside the fully qualified name. As soon as we encounter
1827       # another column by that name (which would imply another table)
1828       # we unset the unqualified slot and never add any info to it
1829       # to avoid erroneous type binding. If this happens the users
1830       # only choice will be to fully qualify his column name
1831
1832       if (exists $bind_attrs->{$col}) {
1833         $bind_attrs->{$col} = {};
1834       }
1835       else {
1836         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1837       }
1838     }
1839   }
1840
1841   # adjust limits
1842   if (
1843     $attrs->{software_limit}
1844       ||
1845     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1846   ) {
1847     $attrs->{software_limit} = 1;
1848   }
1849   else {
1850     $self->throw_exception("rows attribute must be positive if present")
1851       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1852
1853     # MySQL actually recommends this approach.  I cringe.
1854     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1855   }
1856
1857   my @limit;
1858
1859   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1860   # storage, unless software limit was requested
1861   if (
1862     #limited has_many
1863     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1864        ||
1865     # limited prefetch with RNO subqueries
1866     (
1867       $attrs->{rows}
1868         &&
1869       $sql_maker->limit_dialect eq 'RowNumberOver'
1870         &&
1871       $attrs->{_prefetch_select}
1872         &&
1873       @{$attrs->{_prefetch_select}}
1874     )
1875       ||
1876     # grouped prefetch
1877     ( $attrs->{group_by}
1878         &&
1879       @{$attrs->{group_by}}
1880         &&
1881       $attrs->{_prefetch_select}
1882         &&
1883       @{$attrs->{_prefetch_select}}
1884     )
1885   ) {
1886     ($ident, $select, $where, $attrs)
1887       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1888   }
1889
1890   elsif (
1891     ($attrs->{rows} || $attrs->{offset})
1892       &&
1893     $sql_maker->limit_dialect eq 'RowNumberOver'
1894       &&
1895     (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
1896       &&
1897     scalar $self->_parse_order_by ($attrs->{order_by})
1898   ) {
1899     # the RNO limit dialect above mangles the SQL such that the join gets lost
1900     # wrap a subquery here
1901
1902     push @limit, delete @{$attrs}{qw/rows offset/};
1903
1904     my $subq = $self->_select_args_to_query (
1905       $ident,
1906       $select,
1907       $where,
1908       $attrs,
1909     );
1910
1911     $ident = {
1912       -alias => $attrs->{alias},
1913       -source_handle => $ident->[0]{-source_handle},
1914       $attrs->{alias} => $subq,
1915     };
1916
1917     # all part of the subquery now
1918     delete @{$attrs}{qw/order_by group_by having/};
1919     $where = undef;
1920   }
1921
1922   elsif (! $attrs->{software_limit} ) {
1923     push @limit, $attrs->{rows}, $attrs->{offset};
1924   }
1925
1926   # try to simplify the joinmap further (prune unreferenced type-single joins)
1927   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1928
1929 ###
1930   # This would be the point to deflate anything found in $where
1931   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1932   # expect a row object. And all we have is a resultsource (it is trivial
1933   # to extract deflator coderefs via $alias2source above).
1934   #
1935   # I don't see a way forward other than changing the way deflators are
1936   # invoked, and that's just bad...
1937 ###
1938
1939   my $order = { map
1940     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1941     (qw/order_by group_by having/ )
1942   };
1943
1944   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1945 }
1946
1947 # Returns a counting SELECT for a simple count
1948 # query. Abstracted so that a storage could override
1949 # this to { count => 'firstcol' } or whatever makes
1950 # sense as a performance optimization
1951 sub _count_select {
1952   #my ($self, $source, $rs_attrs) = @_;
1953   return { count => '*' };
1954 }
1955
1956 # Returns a SELECT which will end up in the subselect
1957 # There may or may not be a group_by, as the subquery
1958 # might have been called to accomodate a limit
1959 #
1960 # Most databases would be happy with whatever ends up
1961 # here, but some choke in various ways.
1962 #
1963 sub _subq_count_select {
1964   my ($self, $source, $rs_attrs) = @_;
1965
1966   if (my $groupby = $rs_attrs->{group_by}) {
1967
1968     my $avail_columns = $self->_resolve_column_info ($rs_attrs->{from});
1969
1970     my $sel_index;
1971     for my $sel (@{$rs_attrs->{select}}) {
1972       if (ref $sel eq 'HASH' and $sel->{-as}) {
1973         $sel_index->{$sel->{-as}} = $sel;
1974       }
1975     }
1976
1977     my @selection;
1978     for my $g_part (@$groupby) {
1979       if (ref $g_part or $avail_columns->{$g_part}) {
1980         push @selection, $g_part;
1981       }
1982       elsif ($sel_index->{$g_part}) {
1983         push @selection, $sel_index->{$g_part};
1984       }
1985       else {
1986         $self->throw_exception ("group_by criteria '$g_part' not contained within current resultset source(s)");
1987       }
1988     }
1989
1990     return \@selection;
1991   }
1992
1993   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1994   return @pcols ? \@pcols : [ 1 ];
1995 }
1996
1997 sub source_bind_attributes {
1998   my ($self, $source) = @_;
1999
2000   my $bind_attributes;
2001   foreach my $column ($source->columns) {
2002
2003     my $data_type = $source->column_info($column)->{data_type} || '';
2004     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2005      if $data_type;
2006   }
2007
2008   return $bind_attributes;
2009 }
2010
2011 =head2 select
2012
2013 =over 4
2014
2015 =item Arguments: $ident, $select, $condition, $attrs
2016
2017 =back
2018
2019 Handle a SQL select statement.
2020
2021 =cut
2022
2023 sub select {
2024   my $self = shift;
2025   my ($ident, $select, $condition, $attrs) = @_;
2026   return $self->cursor_class->new($self, \@_, $attrs);
2027 }
2028
2029 sub select_single {
2030   my $self = shift;
2031   my ($rv, $sth, @bind) = $self->_select(@_);
2032   my @row = $sth->fetchrow_array;
2033   my @nextrow = $sth->fetchrow_array if @row;
2034   if(@row && @nextrow) {
2035     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2036   }
2037   # Need to call finish() to work round broken DBDs
2038   $sth->finish();
2039   return @row;
2040 }
2041
2042 =head2 sth
2043
2044 =over 4
2045
2046 =item Arguments: $sql
2047
2048 =back
2049
2050 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2051
2052 =cut
2053
2054 sub _dbh_sth {
2055   my ($self, $dbh, $sql) = @_;
2056
2057   # 3 is the if_active parameter which avoids active sth re-use
2058   my $sth = $self->disable_sth_caching
2059     ? $dbh->prepare($sql)
2060     : $dbh->prepare_cached($sql, {}, 3);
2061
2062   # XXX You would think RaiseError would make this impossible,
2063   #  but apparently that's not true :(
2064   $self->throw_exception($dbh->errstr) if !$sth;
2065
2066   $sth;
2067 }
2068
2069 sub sth {
2070   my ($self, $sql) = @_;
2071   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2072 }
2073
2074 sub _dbh_columns_info_for {
2075   my ($self, $dbh, $table) = @_;
2076
2077   if ($dbh->can('column_info')) {
2078     my %result;
2079     eval {
2080       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2081       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2082       $sth->execute();
2083       while ( my $info = $sth->fetchrow_hashref() ){
2084         my %column_info;
2085         $column_info{data_type}   = $info->{TYPE_NAME};
2086         $column_info{size}      = $info->{COLUMN_SIZE};
2087         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2088         $column_info{default_value} = $info->{COLUMN_DEF};
2089         my $col_name = $info->{COLUMN_NAME};
2090         $col_name =~ s/^\"(.*)\"$/$1/;
2091
2092         $result{$col_name} = \%column_info;
2093       }
2094     };
2095     return \%result if !$@ && scalar keys %result;
2096   }
2097
2098   my %result;
2099   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2100   $sth->execute;
2101   my @columns = @{$sth->{NAME_lc}};
2102   for my $i ( 0 .. $#columns ){
2103     my %column_info;
2104     $column_info{data_type} = $sth->{TYPE}->[$i];
2105     $column_info{size} = $sth->{PRECISION}->[$i];
2106     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2107
2108     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2109       $column_info{data_type} = $1;
2110       $column_info{size}    = $2;
2111     }
2112
2113     $result{$columns[$i]} = \%column_info;
2114   }
2115   $sth->finish;
2116
2117   foreach my $col (keys %result) {
2118     my $colinfo = $result{$col};
2119     my $type_num = $colinfo->{data_type};
2120     my $type_name;
2121     if(defined $type_num && $dbh->can('type_info')) {
2122       my $type_info = $dbh->type_info($type_num);
2123       $type_name = $type_info->{TYPE_NAME} if $type_info;
2124       $colinfo->{data_type} = $type_name if $type_name;
2125     }
2126   }
2127
2128   return \%result;
2129 }
2130
2131 sub columns_info_for {
2132   my ($self, $table) = @_;
2133   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2134 }
2135
2136 =head2 last_insert_id
2137
2138 Return the row id of the last insert.
2139
2140 =cut
2141
2142 sub _dbh_last_insert_id {
2143     my ($self, $dbh, $source, $col) = @_;
2144
2145     my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2146
2147     return $id if defined $id;
2148
2149     my $class = ref $self;
2150     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2151 }
2152
2153 sub last_insert_id {
2154   my $self = shift;
2155   $self->_dbh_last_insert_id ($self->_dbh, @_);
2156 }
2157
2158 =head2 _native_data_type
2159
2160 =over 4
2161
2162 =item Arguments: $type_name
2163
2164 =back
2165
2166 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2167 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2168 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2169
2170 The default implementation returns C<undef>, implement in your Storage driver if
2171 you need this functionality.
2172
2173 Should map types from other databases to the native RDBMS type, for example
2174 C<VARCHAR2> to C<VARCHAR>.
2175
2176 Types with modifiers should map to the underlying data type. For example,
2177 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2178
2179 Composite types should map to the container type, for example
2180 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2181
2182 =cut
2183
2184 sub _native_data_type {
2185   #my ($self, $data_type) = @_;
2186   return undef
2187 }
2188
2189 # Check if placeholders are supported at all
2190 sub _placeholders_supported {
2191   my $self = shift;
2192   my $dbh  = $self->_get_dbh;
2193
2194   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2195   # but it is inaccurate more often than not
2196   eval {
2197     local $dbh->{PrintError} = 0;
2198     local $dbh->{RaiseError} = 1;
2199     $dbh->do('select ?', {}, 1);
2200   };
2201   return $@ ? 0 : 1;
2202 }
2203
2204 # Check if placeholders bound to non-string types throw exceptions
2205 #
2206 sub _typeless_placeholders_supported {
2207   my $self = shift;
2208   my $dbh  = $self->_get_dbh;
2209
2210   eval {
2211     local $dbh->{PrintError} = 0;
2212     local $dbh->{RaiseError} = 1;
2213     # this specifically tests a bind that is NOT a string
2214     $dbh->do('select 1 where 1 = ?', {}, 1);
2215   };
2216   return $@ ? 0 : 1;
2217 }
2218
2219 =head2 sqlt_type
2220
2221 Returns the database driver name.
2222
2223 =cut
2224
2225 sub sqlt_type {
2226   shift->_get_dbh->{Driver}->{Name};
2227 }
2228
2229 =head2 bind_attribute_by_data_type
2230
2231 Given a datatype from column info, returns a database specific bind
2232 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2233 let the database planner just handle it.
2234
2235 Generally only needed for special case column types, like bytea in postgres.
2236
2237 =cut
2238
2239 sub bind_attribute_by_data_type {
2240     return;
2241 }
2242
2243 =head2 is_datatype_numeric
2244
2245 Given a datatype from column_info, returns a boolean value indicating if
2246 the current RDBMS considers it a numeric value. This controls how
2247 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2248 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2249 be performed instead of the usual C<eq>.
2250
2251 =cut
2252
2253 sub is_datatype_numeric {
2254   my ($self, $dt) = @_;
2255
2256   return 0 unless $dt;
2257
2258   return $dt =~ /^ (?:
2259     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2260   ) $/ix;
2261 }
2262
2263
2264 =head2 create_ddl_dir
2265
2266 =over 4
2267
2268 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2269
2270 =back
2271
2272 Creates a SQL file based on the Schema, for each of the specified
2273 database engines in C<\@databases> in the given directory.
2274 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2275
2276 Given a previous version number, this will also create a file containing
2277 the ALTER TABLE statements to transform the previous schema into the
2278 current one. Note that these statements may contain C<DROP TABLE> or
2279 C<DROP COLUMN> statements that can potentially destroy data.
2280
2281 The file names are created using the C<ddl_filename> method below, please
2282 override this method in your schema if you would like a different file
2283 name format. For the ALTER file, the same format is used, replacing
2284 $version in the name with "$preversion-$version".
2285
2286 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2287 The most common value for this would be C<< { add_drop_table => 1 } >>
2288 to have the SQL produced include a C<DROP TABLE> statement for each table
2289 created. For quoting purposes supply C<quote_table_names> and
2290 C<quote_field_names>.
2291
2292 If no arguments are passed, then the following default values are assumed:
2293
2294 =over 4
2295
2296 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2297
2298 =item version    - $schema->schema_version
2299
2300 =item directory  - './'
2301
2302 =item preversion - <none>
2303
2304 =back
2305
2306 By default, C<\%sqlt_args> will have
2307
2308  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2309
2310 merged with the hash passed in. To disable any of those features, pass in a
2311 hashref like the following
2312
2313  { ignore_constraint_names => 0, # ... other options }
2314
2315
2316 WARNING: You are strongly advised to check all SQL files created, before applying
2317 them.
2318
2319 =cut
2320
2321 sub create_ddl_dir {
2322   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2323
2324   unless ($dir) {
2325     carp "No directory given, using ./\n";
2326     $dir = './';
2327   }
2328
2329   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2330
2331   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2332   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2333
2334   my $schema_version = $schema->schema_version || '1.x';
2335   $version ||= $schema_version;
2336
2337   $sqltargs = {
2338     add_drop_table => 1,
2339     ignore_constraint_names => 1,
2340     ignore_index_names => 1,
2341     %{$sqltargs || {}}
2342   };
2343
2344   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2345     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2346   }
2347
2348   my $sqlt = SQL::Translator->new( $sqltargs );
2349
2350   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2351   my $sqlt_schema = $sqlt->translate({ data => $schema })
2352     or $self->throw_exception ($sqlt->error);
2353
2354   foreach my $db (@$databases) {
2355     $sqlt->reset();
2356     $sqlt->{schema} = $sqlt_schema;
2357     $sqlt->producer($db);
2358
2359     my $file;
2360     my $filename = $schema->ddl_filename($db, $version, $dir);
2361     if (-e $filename && ($version eq $schema_version )) {
2362       # if we are dumping the current version, overwrite the DDL
2363       carp "Overwriting existing DDL file - $filename";
2364       unlink($filename);
2365     }
2366
2367     my $output = $sqlt->translate;
2368     if(!$output) {
2369       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2370       next;
2371     }
2372     if(!open($file, ">$filename")) {
2373       $self->throw_exception("Can't open $filename for writing ($!)");
2374       next;
2375     }
2376     print $file $output;
2377     close($file);
2378
2379     next unless ($preversion);
2380
2381     require SQL::Translator::Diff;
2382
2383     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2384     if(!-e $prefilename) {
2385       carp("No previous schema file found ($prefilename)");
2386       next;
2387     }
2388
2389     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2390     if(-e $difffile) {
2391       carp("Overwriting existing diff file - $difffile");
2392       unlink($difffile);
2393     }
2394
2395     my $source_schema;
2396     {
2397       my $t = SQL::Translator->new($sqltargs);
2398       $t->debug( 0 );
2399       $t->trace( 0 );
2400
2401       $t->parser( $db )
2402         or $self->throw_exception ($t->error);
2403
2404       my $out = $t->translate( $prefilename )
2405         or $self->throw_exception ($t->error);
2406
2407       $source_schema = $t->schema;
2408
2409       $source_schema->name( $prefilename )
2410         unless ( $source_schema->name );
2411     }
2412
2413     # The "new" style of producers have sane normalization and can support
2414     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2415     # And we have to diff parsed SQL against parsed SQL.
2416     my $dest_schema = $sqlt_schema;
2417
2418     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2419       my $t = SQL::Translator->new($sqltargs);
2420       $t->debug( 0 );
2421       $t->trace( 0 );
2422
2423       $t->parser( $db )
2424         or $self->throw_exception ($t->error);
2425
2426       my $out = $t->translate( $filename )
2427         or $self->throw_exception ($t->error);
2428
2429       $dest_schema = $t->schema;
2430
2431       $dest_schema->name( $filename )
2432         unless $dest_schema->name;
2433     }
2434
2435     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2436                                                   $dest_schema,   $db,
2437                                                   $sqltargs
2438                                                  );
2439     if(!open $file, ">$difffile") {
2440       $self->throw_exception("Can't write to $difffile ($!)");
2441       next;
2442     }
2443     print $file $diff;
2444     close($file);
2445   }
2446 }
2447
2448 =head2 deployment_statements
2449
2450 =over 4
2451
2452 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2453
2454 =back
2455
2456 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2457
2458 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2459 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2460
2461 C<$directory> is used to return statements from files in a previously created
2462 L</create_ddl_dir> directory and is optional. The filenames are constructed
2463 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2464
2465 If no C<$directory> is specified then the statements are constructed on the
2466 fly using L<SQL::Translator> and C<$version> is ignored.
2467
2468 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2469
2470 =cut
2471
2472 sub deployment_statements {
2473   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2474   $type ||= $self->sqlt_type;
2475   $version ||= $schema->schema_version || '1.x';
2476   $dir ||= './';
2477   my $filename = $schema->ddl_filename($type, $version, $dir);
2478   if(-f $filename)
2479   {
2480       my $file;
2481       open($file, "<$filename")
2482         or $self->throw_exception("Can't open $filename ($!)");
2483       my @rows = <$file>;
2484       close($file);
2485       return join('', @rows);
2486   }
2487
2488   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2489     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2490   }
2491
2492   # sources needs to be a parser arg, but for simplicty allow at top level
2493   # coming in
2494   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2495       if exists $sqltargs->{sources};
2496
2497   my $tr = SQL::Translator->new(
2498     producer => "SQL::Translator::Producer::${type}",
2499     %$sqltargs,
2500     parser => 'SQL::Translator::Parser::DBIx::Class',
2501     data => $schema,
2502   );
2503
2504   my @ret;
2505   my $wa = wantarray;
2506   if ($wa) {
2507     @ret = $tr->translate;
2508   }
2509   else {
2510     $ret[0] = $tr->translate;
2511   }
2512
2513   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2514     unless (@ret && defined $ret[0]);
2515
2516   return $wa ? @ret : $ret[0];
2517 }
2518
2519 sub deploy {
2520   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2521   my $deploy = sub {
2522     my $line = shift;
2523     return if($line =~ /^--/);
2524     return if(!$line);
2525     # next if($line =~ /^DROP/m);
2526     return if($line =~ /^BEGIN TRANSACTION/m);
2527     return if($line =~ /^COMMIT/m);
2528     return if $line =~ /^\s+$/; # skip whitespace only
2529     $self->_query_start($line);
2530     eval {
2531       # do a dbh_do cycle here, as we need some error checking in
2532       # place (even though we will ignore errors)
2533       $self->dbh_do (sub { $_[1]->do($line) });
2534     };
2535     if ($@) {
2536       carp qq{$@ (running "${line}")};
2537     }
2538     $self->_query_end($line);
2539   };
2540   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2541   if (@statements > 1) {
2542     foreach my $statement (@statements) {
2543       $deploy->( $statement );
2544     }
2545   }
2546   elsif (@statements == 1) {
2547     foreach my $line ( split(";\n", $statements[0])) {
2548       $deploy->( $line );
2549     }
2550   }
2551 }
2552
2553 =head2 datetime_parser
2554
2555 Returns the datetime parser class
2556
2557 =cut
2558
2559 sub datetime_parser {
2560   my $self = shift;
2561   return $self->{datetime_parser} ||= do {
2562     $self->build_datetime_parser(@_);
2563   };
2564 }
2565
2566 =head2 datetime_parser_type
2567
2568 Defines (returns) the datetime parser class - currently hardwired to
2569 L<DateTime::Format::MySQL>
2570
2571 =cut
2572
2573 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2574
2575 =head2 build_datetime_parser
2576
2577 See L</datetime_parser>
2578
2579 =cut
2580
2581 sub build_datetime_parser {
2582   my $self = shift;
2583   my $type = $self->datetime_parser_type(@_);
2584   $self->ensure_class_loaded ($type);
2585   return $type;
2586 }
2587
2588
2589 =head2 is_replicating
2590
2591 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2592 replicate from a master database.  Default is undef, which is the result
2593 returned by databases that don't support replication.
2594
2595 =cut
2596
2597 sub is_replicating {
2598     return;
2599
2600 }
2601
2602 =head2 lag_behind_master
2603
2604 Returns a number that represents a certain amount of lag behind a master db
2605 when a given storage is replicating.  The number is database dependent, but
2606 starts at zero and increases with the amount of lag. Default in undef
2607
2608 =cut
2609
2610 sub lag_behind_master {
2611     return;
2612 }
2613
2614 =head2 relname_to_table_alias
2615
2616 =over 4
2617
2618 =item Arguments: $relname, $join_count
2619
2620 =back
2621
2622 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2623 queries.
2624
2625 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2626 way these aliases are named.
2627
2628 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2629 otherwise C<"$relname">.
2630
2631 =cut
2632
2633 sub relname_to_table_alias {
2634   my ($self, $relname, $join_count) = @_;
2635
2636   my $alias = ($join_count && $join_count > 1 ?
2637     join('_', $relname, $join_count) : $relname);
2638
2639   return $alias;
2640 }
2641
2642 sub DESTROY {
2643   my $self = shift;
2644
2645   $self->_verify_pid if $self->_dbh;
2646
2647   # some databases need this to stop spewing warnings
2648   if (my $dbh = $self->_dbh) {
2649     local $@;
2650     eval {
2651       %{ $dbh->{CachedKids} } = ();
2652       $dbh->disconnect;
2653     };
2654   }
2655
2656   $self->_dbh(undef);
2657 }
2658
2659 1;
2660
2661 =head1 USAGE NOTES
2662
2663 =head2 DBIx::Class and AutoCommit
2664
2665 DBIx::Class can do some wonderful magic with handling exceptions,
2666 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2667 (the default) combined with C<txn_do> for transaction support.
2668
2669 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2670 in an assumed transaction between commits, and you're telling us you'd
2671 like to manage that manually.  A lot of the magic protections offered by
2672 this module will go away.  We can't protect you from exceptions due to database
2673 disconnects because we don't know anything about how to restart your
2674 transactions.  You're on your own for handling all sorts of exceptional
2675 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2676 be with raw DBI.
2677
2678
2679 =head1 AUTHORS
2680
2681 Matt S. Trout <mst@shadowcatsystems.co.uk>
2682
2683 Andy Grundman <andy@hybridized.org>
2684
2685 =head1 LICENSE
2686
2687 You may distribute this code under the same terms as Perl itself.
2688
2689 =cut