auto_nextval support for Firebird
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18
19 __PACKAGE__->mk_group_accessors('simple' =>
20   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
21      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
22 );
23
24 # the values for these accessors are picked out (and deleted) from
25 # the attribute hashref passed to connect_info
26 my @storage_options = qw/
27   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
28   disable_sth_caching unsafe auto_savepoint
29 /;
30 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
31
32
33 # default cursor class, overridable in connect_info attributes
34 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
35
36 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
37 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
38
39
40 # Each of these methods need _determine_driver called before itself
41 # in order to function reliably. This is a purely DRY optimization
42 my @rdbms_specific_methods = qw/
43   deployment_statements
44   sqlt_type
45   build_datetime_parser
46   datetime_parser_type
47
48   insert
49   insert_bulk
50   update
51   delete
52   select
53   select_single
54 /;
55
56 for my $meth (@rdbms_specific_methods) {
57
58   my $orig = __PACKAGE__->can ($meth)
59     or next;
60
61   no strict qw/refs/;
62   no warnings qw/redefine/;
63   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
64     if (not $_[0]->_driver_determined) {
65       $_[0]->_determine_driver;
66       goto $_[0]->can($meth);
67     }
68     $orig->(@_);
69   };
70 }
71
72
73 =head1 NAME
74
75 DBIx::Class::Storage::DBI - DBI storage handler
76
77 =head1 SYNOPSIS
78
79   my $schema = MySchema->connect('dbi:SQLite:my.db');
80
81   $schema->storage->debug(1);
82
83   my @stuff = $schema->storage->dbh_do(
84     sub {
85       my ($storage, $dbh, @args) = @_;
86       $dbh->do("DROP TABLE authors");
87     },
88     @column_list
89   );
90
91   $schema->resultset('Book')->search({
92      written_on => $schema->storage->datetime_parser(DateTime->now)
93   });
94
95 =head1 DESCRIPTION
96
97 This class represents the connection to an RDBMS via L<DBI>.  See
98 L<DBIx::Class::Storage> for general information.  This pod only
99 documents DBI-specific methods and behaviors.
100
101 =head1 METHODS
102
103 =cut
104
105 sub new {
106   my $new = shift->next::method(@_);
107
108   $new->transaction_depth(0);
109   $new->_sql_maker_opts({});
110   $new->{savepoints} = [];
111   $new->{_in_dbh_do} = 0;
112   $new->{_dbh_gen} = 0;
113
114   $new;
115 }
116
117 =head2 connect_info
118
119 This method is normally called by L<DBIx::Class::Schema/connection>, which
120 encapsulates its argument list in an arrayref before passing them here.
121
122 The argument list may contain:
123
124 =over
125
126 =item *
127
128 The same 4-element argument set one would normally pass to
129 L<DBI/connect>, optionally followed by
130 L<extra attributes|/DBIx::Class specific connection attributes>
131 recognized by DBIx::Class:
132
133   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
134
135 =item *
136
137 A single code reference which returns a connected
138 L<DBI database handle|DBI/connect> optionally followed by
139 L<extra attributes|/DBIx::Class specific connection attributes> recognized
140 by DBIx::Class:
141
142   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
143
144 =item *
145
146 A single hashref with all the attributes and the dsn/user/password
147 mixed together:
148
149   $connect_info_args = [{
150     dsn => $dsn,
151     user => $user,
152     password => $pass,
153     %dbi_attributes,
154     %extra_attributes,
155   }];
156
157   $connect_info_args = [{
158     dbh_maker => sub { DBI->connect (...) },
159     %dbi_attributes,
160     %extra_attributes,
161   }];
162
163 This is particularly useful for L<Catalyst> based applications, allowing the
164 following config (L<Config::General> style):
165
166   <Model::DB>
167     schema_class   App::DB
168     <connect_info>
169       dsn          dbi:mysql:database=test
170       user         testuser
171       password     TestPass
172       AutoCommit   1
173     </connect_info>
174   </Model::DB>
175
176 The C<dsn>/C<user>/C<password> combination can be substituted by the
177 C<dbh_maker> key whose value is a coderef that returns a connected
178 L<DBI database handle|DBI/connect>
179
180 =back
181
182 Please note that the L<DBI> docs recommend that you always explicitly
183 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
184 recommends that it be set to I<1>, and that you perform transactions
185 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
186 to I<1> if you do not do explicitly set it to zero.  This is the default
187 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
188
189 =head3 DBIx::Class specific connection attributes
190
191 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
192 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
193 the following connection options. These options can be mixed in with your other
194 L<DBI> connection attributes, or placed in a separate hashref
195 (C<\%extra_attributes>) as shown above.
196
197 Every time C<connect_info> is invoked, any previous settings for
198 these options will be cleared before setting the new ones, regardless of
199 whether any options are specified in the new C<connect_info>.
200
201
202 =over
203
204 =item on_connect_do
205
206 Specifies things to do immediately after connecting or re-connecting to
207 the database.  Its value may contain:
208
209 =over
210
211 =item a scalar
212
213 This contains one SQL statement to execute.
214
215 =item an array reference
216
217 This contains SQL statements to execute in order.  Each element contains
218 a string or a code reference that returns a string.
219
220 =item a code reference
221
222 This contains some code to execute.  Unlike code references within an
223 array reference, its return value is ignored.
224
225 =back
226
227 =item on_disconnect_do
228
229 Takes arguments in the same form as L</on_connect_do> and executes them
230 immediately before disconnecting from the database.
231
232 Note, this only runs if you explicitly call L</disconnect> on the
233 storage object.
234
235 =item on_connect_call
236
237 A more generalized form of L</on_connect_do> that calls the specified
238 C<connect_call_METHOD> methods in your storage driver.
239
240   on_connect_do => 'select 1'
241
242 is equivalent to:
243
244   on_connect_call => [ [ do_sql => 'select 1' ] ]
245
246 Its values may contain:
247
248 =over
249
250 =item a scalar
251
252 Will call the C<connect_call_METHOD> method.
253
254 =item a code reference
255
256 Will execute C<< $code->($storage) >>
257
258 =item an array reference
259
260 Each value can be a method name or code reference.
261
262 =item an array of arrays
263
264 For each array, the first item is taken to be the C<connect_call_> method name
265 or code reference, and the rest are parameters to it.
266
267 =back
268
269 Some predefined storage methods you may use:
270
271 =over
272
273 =item do_sql
274
275 Executes a SQL string or a code reference that returns a SQL string. This is
276 what L</on_connect_do> and L</on_disconnect_do> use.
277
278 It can take:
279
280 =over
281
282 =item a scalar
283
284 Will execute the scalar as SQL.
285
286 =item an arrayref
287
288 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
289 attributes hashref and bind values.
290
291 =item a code reference
292
293 Will execute C<< $code->($storage) >> and execute the return array refs as
294 above.
295
296 =back
297
298 =item datetime_setup
299
300 Execute any statements necessary to initialize the database session to return
301 and accept datetime/timestamp values used with
302 L<DBIx::Class::InflateColumn::DateTime>.
303
304 Only necessary for some databases, see your specific storage driver for
305 implementation details.
306
307 =back
308
309 =item on_disconnect_call
310
311 Takes arguments in the same form as L</on_connect_call> and executes them
312 immediately before disconnecting from the database.
313
314 Calls the C<disconnect_call_METHOD> methods as opposed to the
315 C<connect_call_METHOD> methods called by L</on_connect_call>.
316
317 Note, this only runs if you explicitly call L</disconnect> on the
318 storage object.
319
320 =item disable_sth_caching
321
322 If set to a true value, this option will disable the caching of
323 statement handles via L<DBI/prepare_cached>.
324
325 =item limit_dialect
326
327 Sets the limit dialect. This is useful for JDBC-bridge among others
328 where the remote SQL-dialect cannot be determined by the name of the
329 driver alone. See also L<SQL::Abstract::Limit>.
330
331 =item quote_char
332
333 Specifies what characters to use to quote table and column names. If
334 you use this you will want to specify L</name_sep> as well.
335
336 C<quote_char> expects either a single character, in which case is it
337 is placed on either side of the table/column name, or an arrayref of length
338 2 in which case the table/column name is placed between the elements.
339
340 For example under MySQL you should use C<< quote_char => '`' >>, and for
341 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
342
343 =item name_sep
344
345 This only needs to be used in conjunction with C<quote_char>, and is used to
346 specify the character that separates elements (schemas, tables, columns) from
347 each other. In most cases this is simply a C<.>.
348
349 The consequences of not supplying this value is that L<SQL::Abstract>
350 will assume DBIx::Class' uses of aliases to be complete column
351 names. The output will look like I<"me.name"> when it should actually
352 be I<"me"."name">.
353
354 =item unsafe
355
356 This Storage driver normally installs its own C<HandleError>, sets
357 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
358 all database handles, including those supplied by a coderef.  It does this
359 so that it can have consistent and useful error behavior.
360
361 If you set this option to a true value, Storage will not do its usual
362 modifications to the database handle's attributes, and instead relies on
363 the settings in your connect_info DBI options (or the values you set in
364 your connection coderef, in the case that you are connecting via coderef).
365
366 Note that your custom settings can cause Storage to malfunction,
367 especially if you set a C<HandleError> handler that suppresses exceptions
368 and/or disable C<RaiseError>.
369
370 =item auto_savepoint
371
372 If this option is true, L<DBIx::Class> will use savepoints when nesting
373 transactions, making it possible to recover from failure in the inner
374 transaction without having to abort all outer transactions.
375
376 =item cursor_class
377
378 Use this argument to supply a cursor class other than the default
379 L<DBIx::Class::Storage::DBI::Cursor>.
380
381 =back
382
383 Some real-life examples of arguments to L</connect_info> and
384 L<DBIx::Class::Schema/connect>
385
386   # Simple SQLite connection
387   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
388
389   # Connect via subref
390   ->connect_info([ sub { DBI->connect(...) } ]);
391
392   # Connect via subref in hashref
393   ->connect_info([{
394     dbh_maker => sub { DBI->connect(...) },
395     on_connect_do => 'alter session ...',
396   }]);
397
398   # A bit more complicated
399   ->connect_info(
400     [
401       'dbi:Pg:dbname=foo',
402       'postgres',
403       'my_pg_password',
404       { AutoCommit => 1 },
405       { quote_char => q{"}, name_sep => q{.} },
406     ]
407   );
408
409   # Equivalent to the previous example
410   ->connect_info(
411     [
412       'dbi:Pg:dbname=foo',
413       'postgres',
414       'my_pg_password',
415       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
416     ]
417   );
418
419   # Same, but with hashref as argument
420   # See parse_connect_info for explanation
421   ->connect_info(
422     [{
423       dsn         => 'dbi:Pg:dbname=foo',
424       user        => 'postgres',
425       password    => 'my_pg_password',
426       AutoCommit  => 1,
427       quote_char  => q{"},
428       name_sep    => q{.},
429     }]
430   );
431
432   # Subref + DBIx::Class-specific connection options
433   ->connect_info(
434     [
435       sub { DBI->connect(...) },
436       {
437           quote_char => q{`},
438           name_sep => q{@},
439           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
440           disable_sth_caching => 1,
441       },
442     ]
443   );
444
445
446
447 =cut
448
449 sub connect_info {
450   my ($self, $info) = @_;
451
452   return $self->_connect_info if !$info;
453
454   $self->_connect_info($info); # copy for _connect_info
455
456   $info = $self->_normalize_connect_info($info)
457     if ref $info eq 'ARRAY';
458
459   for my $storage_opt (keys %{ $info->{storage_options} }) {
460     my $value = $info->{storage_options}{$storage_opt};
461
462     $self->$storage_opt($value);
463   }
464
465   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
466   #  the new set of options
467   $self->_sql_maker(undef);
468   $self->_sql_maker_opts({});
469
470   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
471     my $value = $info->{sql_maker_options}{$sql_maker_opt};
472
473     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
474   }
475
476   my %attrs = (
477     %{ $self->_default_dbi_connect_attributes || {} },
478     %{ $info->{attributes} || {} },
479   );
480
481   my @args = @{ $info->{arguments} };
482
483   $self->_dbi_connect_info([@args,
484     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
485
486   return $self->_connect_info;
487 }
488
489 sub _normalize_connect_info {
490   my ($self, $info_arg) = @_;
491   my %info;
492
493   my @args = @$info_arg;  # take a shallow copy for further mutilation
494
495   # combine/pre-parse arguments depending on invocation style
496
497   my %attrs;
498   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
499     %attrs = %{ $args[1] || {} };
500     @args = $args[0];
501   }
502   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
503     %attrs = %{$args[0]};
504     @args = ();
505     if (my $code = delete $attrs{dbh_maker}) {
506       @args = $code;
507
508       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
509       if (@ignored) {
510         carp sprintf (
511             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
512           . "to the result of 'dbh_maker'",
513
514           join (', ', map { "'$_'" } (@ignored) ),
515         );
516       }
517     }
518     else {
519       @args = delete @attrs{qw/dsn user password/};
520     }
521   }
522   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
523     %attrs = (
524       % { $args[3] || {} },
525       % { $args[4] || {} },
526     );
527     @args = @args[0,1,2];
528   }
529
530   $info{arguments} = \@args;
531
532   my @storage_opts = grep exists $attrs{$_},
533     @storage_options, 'cursor_class';
534
535   @{ $info{storage_options} }{@storage_opts} =
536     delete @attrs{@storage_opts} if @storage_opts;
537
538   my @sql_maker_opts = grep exists $attrs{$_},
539     qw/limit_dialect quote_char name_sep/;
540
541   @{ $info{sql_maker_options} }{@sql_maker_opts} =
542     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
543
544   $info{attributes} = \%attrs if %attrs;
545
546   return \%info;
547 }
548
549 sub _default_dbi_connect_attributes {
550   return {
551     AutoCommit => 1,
552     RaiseError => 1,
553     PrintError => 0,
554   };
555 }
556
557 =head2 on_connect_do
558
559 This method is deprecated in favour of setting via L</connect_info>.
560
561 =cut
562
563 =head2 on_disconnect_do
564
565 This method is deprecated in favour of setting via L</connect_info>.
566
567 =cut
568
569 sub _parse_connect_do {
570   my ($self, $type) = @_;
571
572   my $val = $self->$type;
573   return () if not defined $val;
574
575   my @res;
576
577   if (not ref($val)) {
578     push @res, [ 'do_sql', $val ];
579   } elsif (ref($val) eq 'CODE') {
580     push @res, $val;
581   } elsif (ref($val) eq 'ARRAY') {
582     push @res, map { [ 'do_sql', $_ ] } @$val;
583   } else {
584     $self->throw_exception("Invalid type for $type: ".ref($val));
585   }
586
587   return \@res;
588 }
589
590 =head2 dbh_do
591
592 Arguments: ($subref | $method_name), @extra_coderef_args?
593
594 Execute the given $subref or $method_name using the new exception-based
595 connection management.
596
597 The first two arguments will be the storage object that C<dbh_do> was called
598 on and a database handle to use.  Any additional arguments will be passed
599 verbatim to the called subref as arguments 2 and onwards.
600
601 Using this (instead of $self->_dbh or $self->dbh) ensures correct
602 exception handling and reconnection (or failover in future subclasses).
603
604 Your subref should have no side-effects outside of the database, as
605 there is the potential for your subref to be partially double-executed
606 if the database connection was stale/dysfunctional.
607
608 Example:
609
610   my @stuff = $schema->storage->dbh_do(
611     sub {
612       my ($storage, $dbh, @cols) = @_;
613       my $cols = join(q{, }, @cols);
614       $dbh->selectrow_array("SELECT $cols FROM foo");
615     },
616     @column_list
617   );
618
619 =cut
620
621 sub dbh_do {
622   my $self = shift;
623   my $code = shift;
624
625   my $dbh = $self->_get_dbh;
626
627   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
628       || $self->{transaction_depth};
629
630   local $self->{_in_dbh_do} = 1;
631
632   my @result;
633   my $want_array = wantarray;
634
635   eval {
636
637     if($want_array) {
638         @result = $self->$code($dbh, @_);
639     }
640     elsif(defined $want_array) {
641         $result[0] = $self->$code($dbh, @_);
642     }
643     else {
644         $self->$code($dbh, @_);
645     }
646   };
647
648   # ->connected might unset $@ - copy
649   my $exception = $@;
650   if(!$exception) { return $want_array ? @result : $result[0] }
651
652   $self->throw_exception($exception) if $self->connected;
653
654   # We were not connected - reconnect and retry, but let any
655   #  exception fall right through this time
656   carp "Retrying $code after catching disconnected exception: $exception"
657     if $ENV{DBIC_DBIRETRY_DEBUG};
658   $self->_populate_dbh;
659   $self->$code($self->_dbh, @_);
660 }
661
662 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
663 # It also informs dbh_do to bypass itself while under the direction of txn_do,
664 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
665 sub txn_do {
666   my $self = shift;
667   my $coderef = shift;
668
669   ref $coderef eq 'CODE' or $self->throw_exception
670     ('$coderef must be a CODE reference');
671
672   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
673
674   local $self->{_in_dbh_do} = 1;
675
676   my @result;
677   my $want_array = wantarray;
678
679   my $tried = 0;
680   while(1) {
681     eval {
682       $self->_get_dbh;
683
684       $self->txn_begin;
685       if($want_array) {
686           @result = $coderef->(@_);
687       }
688       elsif(defined $want_array) {
689           $result[0] = $coderef->(@_);
690       }
691       else {
692           $coderef->(@_);
693       }
694       $self->txn_commit;
695     };
696
697     # ->connected might unset $@ - copy
698     my $exception = $@;
699     if(!$exception) { return $want_array ? @result : $result[0] }
700
701     if($tried++ || $self->connected) {
702       eval { $self->txn_rollback };
703       my $rollback_exception = $@;
704       if($rollback_exception) {
705         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
706         $self->throw_exception($exception)  # propagate nested rollback
707           if $rollback_exception =~ /$exception_class/;
708
709         $self->throw_exception(
710           "Transaction aborted: ${exception}. "
711           . "Rollback failed: ${rollback_exception}"
712         );
713       }
714       $self->throw_exception($exception)
715     }
716
717     # We were not connected, and was first try - reconnect and retry
718     # via the while loop
719     carp "Retrying $coderef after catching disconnected exception: $exception"
720       if $ENV{DBIC_DBIRETRY_DEBUG};
721     $self->_populate_dbh;
722   }
723 }
724
725 =head2 disconnect
726
727 Our C<disconnect> method also performs a rollback first if the
728 database is not in C<AutoCommit> mode.
729
730 =cut
731
732 sub disconnect {
733   my ($self) = @_;
734
735   if( $self->_dbh ) {
736     my @actions;
737
738     push @actions, ( $self->on_disconnect_call || () );
739     push @actions, $self->_parse_connect_do ('on_disconnect_do');
740
741     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
742
743     $self->_dbh_rollback unless $self->_dbh_autocommit;
744
745     %{ $self->_dbh->{CachedKids} } = ();
746     $self->_dbh->disconnect;
747     $self->_dbh(undef);
748     $self->{_dbh_gen}++;
749   }
750 }
751
752 =head2 with_deferred_fk_checks
753
754 =over 4
755
756 =item Arguments: C<$coderef>
757
758 =item Return Value: The return value of $coderef
759
760 =back
761
762 Storage specific method to run the code ref with FK checks deferred or
763 in MySQL's case disabled entirely.
764
765 =cut
766
767 # Storage subclasses should override this
768 sub with_deferred_fk_checks {
769   my ($self, $sub) = @_;
770   $sub->();
771 }
772
773 =head2 connected
774
775 =over
776
777 =item Arguments: none
778
779 =item Return Value: 1|0
780
781 =back
782
783 Verifies that the current database handle is active and ready to execute
784 an SQL statement (e.g. the connection did not get stale, server is still
785 answering, etc.) This method is used internally by L</dbh>.
786
787 =cut
788
789 sub connected {
790   my $self = shift;
791   return 0 unless $self->_seems_connected;
792
793   #be on the safe side
794   local $self->_dbh->{RaiseError} = 1;
795
796   return $self->_ping;
797 }
798
799 sub _seems_connected {
800   my $self = shift;
801
802   my $dbh = $self->_dbh
803     or return 0;
804
805   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
806     $self->_dbh(undef);
807     $self->{_dbh_gen}++;
808     return 0;
809   }
810   else {
811     $self->_verify_pid;
812     return 0 if !$self->_dbh;
813   }
814
815   return $dbh->FETCH('Active');
816 }
817
818 sub _ping {
819   my $self = shift;
820
821   my $dbh = $self->_dbh or return 0;
822
823   return $dbh->ping;
824 }
825
826 # handle pid changes correctly
827 #  NOTE: assumes $self->_dbh is a valid $dbh
828 sub _verify_pid {
829   my ($self) = @_;
830
831   return if defined $self->_conn_pid && $self->_conn_pid == $$;
832
833   $self->_dbh->{InactiveDestroy} = 1;
834   $self->_dbh(undef);
835   $self->{_dbh_gen}++;
836
837   return;
838 }
839
840 sub ensure_connected {
841   my ($self) = @_;
842
843   unless ($self->connected) {
844     $self->_populate_dbh;
845   }
846 }
847
848 =head2 dbh
849
850 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
851 is guaranteed to be healthy by implicitly calling L</connected>, and if
852 necessary performing a reconnection before returning. Keep in mind that this
853 is very B<expensive> on some database engines. Consider using L<dbh_do>
854 instead.
855
856 =cut
857
858 sub dbh {
859   my ($self) = @_;
860
861   if (not $self->_dbh) {
862     $self->_populate_dbh;
863   } else {
864     $self->ensure_connected;
865   }
866   return $self->_dbh;
867 }
868
869 # this is the internal "get dbh or connect (don't check)" method
870 sub _get_dbh {
871   my $self = shift;
872   $self->_verify_pid if $self->_dbh;
873   $self->_populate_dbh unless $self->_dbh;
874   return $self->_dbh;
875 }
876
877 sub _sql_maker_args {
878     my ($self) = @_;
879
880     return (
881       bindtype=>'columns',
882       array_datatypes => 1,
883       limit_dialect => $self->_get_dbh,
884       %{$self->_sql_maker_opts}
885     );
886 }
887
888 sub sql_maker {
889   my ($self) = @_;
890   unless ($self->_sql_maker) {
891     my $sql_maker_class = $self->sql_maker_class;
892     $self->ensure_class_loaded ($sql_maker_class);
893     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
894   }
895   return $self->_sql_maker;
896 }
897
898 # nothing to do by default
899 sub _rebless {}
900 sub _init {}
901
902 sub _populate_dbh {
903   my ($self) = @_;
904
905   my @info = @{$self->_dbi_connect_info || []};
906   $self->_dbh(undef); # in case ->connected failed we might get sent here
907   $self->_dbh($self->_connect(@info));
908
909   $self->_conn_pid($$);
910   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
911
912   $self->_determine_driver;
913
914   # Always set the transaction depth on connect, since
915   #  there is no transaction in progress by definition
916   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
917
918   $self->_run_connection_actions unless $self->{_in_determine_driver};
919 }
920
921 sub _run_connection_actions {
922   my $self = shift;
923   my @actions;
924
925   push @actions, ( $self->on_connect_call || () );
926   push @actions, $self->_parse_connect_do ('on_connect_do');
927
928   $self->_do_connection_actions(connect_call_ => $_) for @actions;
929 }
930
931 sub _determine_driver {
932   my ($self) = @_;
933
934   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
935     my $started_connected = 0;
936     local $self->{_in_determine_driver} = 1;
937
938     if (ref($self) eq __PACKAGE__) {
939       my $driver;
940       if ($self->_dbh) { # we are connected
941         $driver = $self->_dbh->{Driver}{Name};
942         $started_connected = 1;
943       } else {
944         # if connect_info is a CODEREF, we have no choice but to connect
945         if (ref $self->_dbi_connect_info->[0] &&
946             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
947           $self->_populate_dbh;
948           $driver = $self->_dbh->{Driver}{Name};
949         }
950         else {
951           # try to use dsn to not require being connected, the driver may still
952           # force a connection in _rebless to determine version
953           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
954         }
955       }
956
957       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
958       if ($self->load_optional_class($storage_class)) {
959         mro::set_mro($storage_class, 'c3');
960         bless $self, $storage_class;
961         $self->_rebless();
962       }
963     }
964
965     $self->_driver_determined(1);
966
967     $self->_init; # run driver-specific initializations
968
969     $self->_run_connection_actions
970         if !$started_connected && defined $self->_dbh;
971   }
972 }
973
974 sub _do_connection_actions {
975   my $self          = shift;
976   my $method_prefix = shift;
977   my $call          = shift;
978
979   if (not ref($call)) {
980     my $method = $method_prefix . $call;
981     $self->$method(@_);
982   } elsif (ref($call) eq 'CODE') {
983     $self->$call(@_);
984   } elsif (ref($call) eq 'ARRAY') {
985     if (ref($call->[0]) ne 'ARRAY') {
986       $self->_do_connection_actions($method_prefix, $_) for @$call;
987     } else {
988       $self->_do_connection_actions($method_prefix, @$_) for @$call;
989     }
990   } else {
991     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
992   }
993
994   return $self;
995 }
996
997 sub connect_call_do_sql {
998   my $self = shift;
999   $self->_do_query(@_);
1000 }
1001
1002 sub disconnect_call_do_sql {
1003   my $self = shift;
1004   $self->_do_query(@_);
1005 }
1006
1007 # override in db-specific backend when necessary
1008 sub connect_call_datetime_setup { 1 }
1009
1010 sub _do_query {
1011   my ($self, $action) = @_;
1012
1013   if (ref $action eq 'CODE') {
1014     $action = $action->($self);
1015     $self->_do_query($_) foreach @$action;
1016   }
1017   else {
1018     # Most debuggers expect ($sql, @bind), so we need to exclude
1019     # the attribute hash which is the second argument to $dbh->do
1020     # furthermore the bind values are usually to be presented
1021     # as named arrayref pairs, so wrap those here too
1022     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1023     my $sql = shift @do_args;
1024     my $attrs = shift @do_args;
1025     my @bind = map { [ undef, $_ ] } @do_args;
1026
1027     $self->_query_start($sql, @bind);
1028     $self->_get_dbh->do($sql, $attrs, @do_args);
1029     $self->_query_end($sql, @bind);
1030   }
1031
1032   return $self;
1033 }
1034
1035 sub _connect {
1036   my ($self, @info) = @_;
1037
1038   $self->throw_exception("You failed to provide any connection info")
1039     if !@info;
1040
1041   my ($old_connect_via, $dbh);
1042
1043   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1044     $old_connect_via = $DBI::connect_via;
1045     $DBI::connect_via = 'connect';
1046   }
1047
1048   eval {
1049     if(ref $info[0] eq 'CODE') {
1050        $dbh = $info[0]->();
1051     }
1052     else {
1053        $dbh = DBI->connect(@info);
1054     }
1055
1056     if($dbh && !$self->unsafe) {
1057       my $weak_self = $self;
1058       Scalar::Util::weaken($weak_self);
1059       $dbh->{HandleError} = sub {
1060           if ($weak_self) {
1061             $weak_self->throw_exception("DBI Exception: $_[0]");
1062           }
1063           else {
1064             # the handler may be invoked by something totally out of
1065             # the scope of DBIC
1066             croak ("DBI Exception: $_[0]");
1067           }
1068       };
1069       $dbh->{ShowErrorStatement} = 1;
1070       $dbh->{RaiseError} = 1;
1071       $dbh->{PrintError} = 0;
1072     }
1073   };
1074
1075   $DBI::connect_via = $old_connect_via if $old_connect_via;
1076
1077   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1078     if !$dbh || $@;
1079
1080   $self->_dbh_autocommit($dbh->{AutoCommit});
1081
1082   $dbh;
1083 }
1084
1085 sub svp_begin {
1086   my ($self, $name) = @_;
1087
1088   $name = $self->_svp_generate_name
1089     unless defined $name;
1090
1091   $self->throw_exception ("You can't use savepoints outside a transaction")
1092     if $self->{transaction_depth} == 0;
1093
1094   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1095     unless $self->can('_svp_begin');
1096
1097   push @{ $self->{savepoints} }, $name;
1098
1099   $self->debugobj->svp_begin($name) if $self->debug;
1100
1101   return $self->_svp_begin($name);
1102 }
1103
1104 sub svp_release {
1105   my ($self, $name) = @_;
1106
1107   $self->throw_exception ("You can't use savepoints outside a transaction")
1108     if $self->{transaction_depth} == 0;
1109
1110   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1111     unless $self->can('_svp_release');
1112
1113   if (defined $name) {
1114     $self->throw_exception ("Savepoint '$name' does not exist")
1115       unless grep { $_ eq $name } @{ $self->{savepoints} };
1116
1117     # Dig through the stack until we find the one we are releasing.  This keeps
1118     # the stack up to date.
1119     my $svp;
1120
1121     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1122   } else {
1123     $name = pop @{ $self->{savepoints} };
1124   }
1125
1126   $self->debugobj->svp_release($name) if $self->debug;
1127
1128   return $self->_svp_release($name);
1129 }
1130
1131 sub svp_rollback {
1132   my ($self, $name) = @_;
1133
1134   $self->throw_exception ("You can't use savepoints outside a transaction")
1135     if $self->{transaction_depth} == 0;
1136
1137   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1138     unless $self->can('_svp_rollback');
1139
1140   if (defined $name) {
1141       # If they passed us a name, verify that it exists in the stack
1142       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1143           $self->throw_exception("Savepoint '$name' does not exist!");
1144       }
1145
1146       # Dig through the stack until we find the one we are releasing.  This keeps
1147       # the stack up to date.
1148       while(my $s = pop(@{ $self->{savepoints} })) {
1149           last if($s eq $name);
1150       }
1151       # Add the savepoint back to the stack, as a rollback doesn't remove the
1152       # named savepoint, only everything after it.
1153       push(@{ $self->{savepoints} }, $name);
1154   } else {
1155       # We'll assume they want to rollback to the last savepoint
1156       $name = $self->{savepoints}->[-1];
1157   }
1158
1159   $self->debugobj->svp_rollback($name) if $self->debug;
1160
1161   return $self->_svp_rollback($name);
1162 }
1163
1164 sub _svp_generate_name {
1165     my ($self) = @_;
1166
1167     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1168 }
1169
1170 sub txn_begin {
1171   my $self = shift;
1172
1173   # this means we have not yet connected and do not know the AC status
1174   # (e.g. coderef $dbh)
1175   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1176
1177   if($self->{transaction_depth} == 0) {
1178     $self->debugobj->txn_begin()
1179       if $self->debug;
1180     $self->_dbh_begin_work;
1181   }
1182   elsif ($self->auto_savepoint) {
1183     $self->svp_begin;
1184   }
1185   $self->{transaction_depth}++;
1186 }
1187
1188 sub _dbh_begin_work {
1189   my $self = shift;
1190
1191   # if the user is utilizing txn_do - good for him, otherwise we need to
1192   # ensure that the $dbh is healthy on BEGIN.
1193   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1194   # will be replaced by a failure of begin_work itself (which will be
1195   # then retried on reconnect)
1196   if ($self->{_in_dbh_do}) {
1197     $self->_dbh->begin_work;
1198   } else {
1199     $self->dbh_do(sub { $_[1]->begin_work });
1200   }
1201 }
1202
1203 sub txn_commit {
1204   my $self = shift;
1205   if ($self->{transaction_depth} == 1) {
1206     $self->debugobj->txn_commit()
1207       if ($self->debug);
1208     $self->_dbh_commit;
1209     $self->{transaction_depth} = 0
1210       if $self->_dbh_autocommit;
1211   }
1212   elsif($self->{transaction_depth} > 1) {
1213     $self->{transaction_depth}--;
1214     $self->svp_release
1215       if $self->auto_savepoint;
1216   }
1217 }
1218
1219 sub _dbh_commit {
1220   my $self = shift;
1221   my $dbh  = $self->_dbh
1222     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1223   $dbh->commit;
1224 }
1225
1226 sub txn_rollback {
1227   my $self = shift;
1228   my $dbh = $self->_dbh;
1229   eval {
1230     if ($self->{transaction_depth} == 1) {
1231       $self->debugobj->txn_rollback()
1232         if ($self->debug);
1233       $self->{transaction_depth} = 0
1234         if $self->_dbh_autocommit;
1235       $self->_dbh_rollback;
1236     }
1237     elsif($self->{transaction_depth} > 1) {
1238       $self->{transaction_depth}--;
1239       if ($self->auto_savepoint) {
1240         $self->svp_rollback;
1241         $self->svp_release;
1242       }
1243     }
1244     else {
1245       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1246     }
1247   };
1248   if ($@) {
1249     my $error = $@;
1250     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1251     $error =~ /$exception_class/ and $self->throw_exception($error);
1252     # ensure that a failed rollback resets the transaction depth
1253     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1254     $self->throw_exception($error);
1255   }
1256 }
1257
1258 sub _dbh_rollback {
1259   my $self = shift;
1260   my $dbh  = $self->_dbh
1261     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1262   $dbh->rollback;
1263 }
1264
1265 # This used to be the top-half of _execute.  It was split out to make it
1266 #  easier to override in NoBindVars without duping the rest.  It takes up
1267 #  all of _execute's args, and emits $sql, @bind.
1268 sub _prep_for_execute {
1269   my ($self, $op, $extra_bind, $ident, $args) = @_;
1270
1271   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1272     $ident = $ident->from();
1273   }
1274
1275   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1276
1277   unshift(@bind,
1278     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1279       if $extra_bind;
1280   return ($sql, \@bind);
1281 }
1282
1283
1284 sub _fix_bind_params {
1285     my ($self, @bind) = @_;
1286
1287     ### Turn @bind from something like this:
1288     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1289     ### to this:
1290     ###   ( "'1'", "'1'", "'3'" )
1291     return
1292         map {
1293             if ( defined( $_ && $_->[1] ) ) {
1294                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1295             }
1296             else { q{'NULL'}; }
1297         } @bind;
1298 }
1299
1300 sub _query_start {
1301     my ( $self, $sql, @bind ) = @_;
1302
1303     if ( $self->debug ) {
1304         @bind = $self->_fix_bind_params(@bind);
1305
1306         $self->debugobj->query_start( $sql, @bind );
1307     }
1308 }
1309
1310 sub _query_end {
1311     my ( $self, $sql, @bind ) = @_;
1312
1313     if ( $self->debug ) {
1314         @bind = $self->_fix_bind_params(@bind);
1315         $self->debugobj->query_end( $sql, @bind );
1316     }
1317 }
1318
1319 sub _dbh_execute {
1320   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1321
1322   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1323
1324   $self->_query_start( $sql, @$bind );
1325
1326   my $sth = $self->sth($sql,$op);
1327
1328   my $placeholder_index = 1;
1329
1330   foreach my $bound (@$bind) {
1331     my $attributes = {};
1332     my($column_name, @data) = @$bound;
1333
1334     if ($bind_attributes) {
1335       $attributes = $bind_attributes->{$column_name}
1336       if defined $bind_attributes->{$column_name};
1337     }
1338
1339     foreach my $data (@data) {
1340       my $ref = ref $data;
1341       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1342
1343       $sth->bind_param($placeholder_index, $data, $attributes);
1344       $placeholder_index++;
1345     }
1346   }
1347
1348   # Can this fail without throwing an exception anyways???
1349   my $rv = $sth->execute();
1350   $self->throw_exception($sth->errstr) if !$rv;
1351
1352   $self->_query_end( $sql, @$bind );
1353
1354   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1355 }
1356
1357 sub _execute {
1358     my $self = shift;
1359     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1360 }
1361
1362 sub insert {
1363   my ($self, $source, $to_insert) = @_;
1364
1365   my $ident = $source->from;
1366   my $bind_attributes = $self->source_bind_attributes($source);
1367
1368   my $updated_cols = {};
1369
1370   foreach my $col ( $source->columns ) {
1371     if ( !defined $to_insert->{$col} ) {
1372       my $col_info = $source->column_info($col);
1373
1374       if ( $col_info->{auto_nextval} ) {
1375         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1376           'nextval',
1377           $col_info->{sequence} ||
1378             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1379         );
1380       }
1381     }
1382   }
1383
1384   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1385
1386   return $updated_cols;
1387 }
1388
1389 ## Currently it is assumed that all values passed will be "normal", i.e. not
1390 ## scalar refs, or at least, all the same type as the first set, the statement is
1391 ## only prepped once.
1392 sub insert_bulk {
1393   my ($self, $source, $cols, $data) = @_;
1394
1395   my %colvalues;
1396   @colvalues{@$cols} = (0..$#$cols);
1397
1398   for my $i (0..$#$cols) {
1399     my $first_val = $data->[0][$i];
1400     next unless ref $first_val eq 'SCALAR';
1401
1402     $colvalues{ $cols->[$i] } = $first_val;
1403   }
1404
1405   # check for bad data and stringify stringifiable objects
1406   my $bad_slice = sub {
1407     my ($msg, $col_idx, $slice_idx) = @_;
1408     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1409       $msg,
1410       $cols->[$col_idx],
1411       do {
1412         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1413         Data::Dumper::Concise::Dumper({
1414           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1415         }),
1416       }
1417     );
1418   };
1419
1420   for my $datum_idx (0..$#$data) {
1421     my $datum = $data->[$datum_idx];
1422
1423     for my $col_idx (0..$#$cols) {
1424       my $val            = $datum->[$col_idx];
1425       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1426       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1427
1428       if ($is_literal_sql) {
1429         if (not ref $val) {
1430           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1431         }
1432         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1433           $bad_slice->("$reftype reference found where literal SQL expected",
1434             $col_idx, $datum_idx);
1435         }
1436         elsif ($$val ne $$sqla_bind){
1437           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1438             $col_idx, $datum_idx);
1439         }
1440       }
1441       elsif (my $reftype = ref $val) {
1442         require overload;
1443         if (overload::Method($val, '""')) {
1444           $datum->[$col_idx] = "".$val;
1445         }
1446         else {
1447           $bad_slice->("$reftype reference found where bind expected",
1448             $col_idx, $datum_idx);
1449         }
1450       }
1451     }
1452   }
1453
1454   my ($sql, $bind) = $self->_prep_for_execute (
1455     'insert', undef, $source, [\%colvalues]
1456   );
1457   my @bind = @$bind;
1458
1459   my $empty_bind = 1 if (not @bind) &&
1460     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1461
1462   if ((not @bind) && (not $empty_bind)) {
1463     $self->throw_exception(
1464       'Cannot insert_bulk without support for placeholders'
1465     );
1466   }
1467
1468   # neither _execute_array, nor _execute_inserts_with_no_binds are
1469   # atomic (even if _execute _array is a single call). Thus a safety
1470   # scope guard
1471   my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
1472
1473   $self->_query_start( $sql, ['__BULK__'] );
1474   my $sth = $self->sth($sql);
1475   my $rv = do {
1476     if ($empty_bind) {
1477       # bind_param_array doesn't work if there are no binds
1478       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1479     }
1480     else {
1481 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1482       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1483     }
1484   };
1485
1486   $self->_query_end( $sql, ['__BULK__'] );
1487
1488
1489   $guard->commit if $guard;
1490
1491   return (wantarray ? ($rv, $sth, @bind) : $rv);
1492 }
1493
1494 sub _execute_array {
1495   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1496
1497   ## This must be an arrayref, else nothing works!
1498   my $tuple_status = [];
1499
1500   ## Get the bind_attributes, if any exist
1501   my $bind_attributes = $self->source_bind_attributes($source);
1502
1503   ## Bind the values and execute
1504   my $placeholder_index = 1;
1505
1506   foreach my $bound (@$bind) {
1507
1508     my $attributes = {};
1509     my ($column_name, $data_index) = @$bound;
1510
1511     if( $bind_attributes ) {
1512       $attributes = $bind_attributes->{$column_name}
1513       if defined $bind_attributes->{$column_name};
1514     }
1515
1516     my @data = map { $_->[$data_index] } @$data;
1517
1518     $sth->bind_param_array(
1519       $placeholder_index,
1520       [@data],
1521       (%$attributes ?  $attributes : ()),
1522     );
1523     $placeholder_index++;
1524   }
1525
1526   my $rv = eval {
1527     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1528   };
1529   my $err = $@ || $sth->errstr;
1530
1531 # Statement must finish even if there was an exception.
1532   eval { $sth->finish };
1533   $err = $@ unless $err;
1534
1535   if ($err) {
1536     my $i = 0;
1537     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1538
1539     $self->throw_exception("Unexpected populate error: $err")
1540       if ($i > $#$tuple_status);
1541
1542     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1543       ($tuple_status->[$i][1] || $err),
1544       Data::Dumper::Concise::Dumper({
1545         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1546       }),
1547     );
1548   }
1549   return $rv;
1550 }
1551
1552 sub _dbh_execute_array {
1553     my ($self, $sth, $tuple_status, @extra) = @_;
1554
1555     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1556 }
1557
1558 sub _dbh_execute_inserts_with_no_binds {
1559   my ($self, $sth, $count) = @_;
1560
1561   eval {
1562     my $dbh = $self->_get_dbh;
1563     local $dbh->{RaiseError} = 1;
1564     local $dbh->{PrintError} = 0;
1565
1566     $sth->execute foreach 1..$count;
1567   };
1568   my $exception = $@;
1569
1570 # Make sure statement is finished even if there was an exception.
1571   eval { $sth->finish };
1572   $exception = $@ unless $exception;
1573
1574   $self->throw_exception($exception) if $exception;
1575
1576   return $count;
1577 }
1578
1579 sub update {
1580   my ($self, $source, @args) = @_;
1581
1582   my $bind_attrs = $self->source_bind_attributes($source);
1583
1584   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1585 }
1586
1587
1588 sub delete {
1589   my ($self, $source, @args) = @_;
1590
1591   my $bind_attrs = $self->source_bind_attributes($source);
1592
1593   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1594 }
1595
1596 # We were sent here because the $rs contains a complex search
1597 # which will require a subquery to select the correct rows
1598 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1599 #
1600 # Generating a single PK column subquery is trivial and supported
1601 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1602 # Look at _multipk_update_delete()
1603 sub _subq_update_delete {
1604   my $self = shift;
1605   my ($rs, $op, $values) = @_;
1606
1607   my $rsrc = $rs->result_source;
1608
1609   # quick check if we got a sane rs on our hands
1610   my @pcols = $rsrc->_pri_cols;
1611
1612   my $sel = $rs->_resolved_attrs->{select};
1613   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1614
1615   if (
1616       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1617         ne
1618       join ("\x00", sort @$sel )
1619   ) {
1620     $self->throw_exception (
1621       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1622     );
1623   }
1624
1625   if (@pcols == 1) {
1626     return $self->$op (
1627       $rsrc,
1628       $op eq 'update' ? $values : (),
1629       { $pcols[0] => { -in => $rs->as_query } },
1630     );
1631   }
1632
1633   else {
1634     return $self->_multipk_update_delete (@_);
1635   }
1636 }
1637
1638 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1639 # resultset update/delete involving subqueries. So by default resort
1640 # to simple (and inefficient) delete_all style per-row opearations,
1641 # while allowing specific storages to override this with a faster
1642 # implementation.
1643 #
1644 sub _multipk_update_delete {
1645   return shift->_per_row_update_delete (@_);
1646 }
1647
1648 # This is the default loop used to delete/update rows for multi PK
1649 # resultsets, and used by mysql exclusively (because it can't do anything
1650 # else).
1651 #
1652 # We do not use $row->$op style queries, because resultset update/delete
1653 # is not expected to cascade (this is what delete_all/update_all is for).
1654 #
1655 # There should be no race conditions as the entire operation is rolled
1656 # in a transaction.
1657 #
1658 sub _per_row_update_delete {
1659   my $self = shift;
1660   my ($rs, $op, $values) = @_;
1661
1662   my $rsrc = $rs->result_source;
1663   my @pcols = $rsrc->_pri_cols;
1664
1665   my $guard = $self->txn_scope_guard;
1666
1667   # emulate the return value of $sth->execute for non-selects
1668   my $row_cnt = '0E0';
1669
1670   my $subrs_cur = $rs->cursor;
1671   my @all_pk = $subrs_cur->all;
1672   for my $pks ( @all_pk) {
1673
1674     my $cond;
1675     for my $i (0.. $#pcols) {
1676       $cond->{$pcols[$i]} = $pks->[$i];
1677     }
1678
1679     $self->$op (
1680       $rsrc,
1681       $op eq 'update' ? $values : (),
1682       $cond,
1683     );
1684
1685     $row_cnt++;
1686   }
1687
1688   $guard->commit;
1689
1690   return $row_cnt;
1691 }
1692
1693 sub _select {
1694   my $self = shift;
1695
1696   # localization is neccessary as
1697   # 1) there is no infrastructure to pass this around before SQLA2
1698   # 2) _select_args sets it and _prep_for_execute consumes it
1699   my $sql_maker = $self->sql_maker;
1700   local $sql_maker->{_dbic_rs_attrs};
1701
1702   return $self->_execute($self->_select_args(@_));
1703 }
1704
1705 sub _select_args_to_query {
1706   my $self = shift;
1707
1708   # localization is neccessary as
1709   # 1) there is no infrastructure to pass this around before SQLA2
1710   # 2) _select_args sets it and _prep_for_execute consumes it
1711   my $sql_maker = $self->sql_maker;
1712   local $sql_maker->{_dbic_rs_attrs};
1713
1714   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1715   #  = $self->_select_args($ident, $select, $cond, $attrs);
1716   my ($op, $bind, $ident, $bind_attrs, @args) =
1717     $self->_select_args(@_);
1718
1719   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1720   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1721   $prepared_bind ||= [];
1722
1723   return wantarray
1724     ? ($sql, $prepared_bind, $bind_attrs)
1725     : \[ "($sql)", @$prepared_bind ]
1726   ;
1727 }
1728
1729 sub _select_args {
1730   my ($self, $ident, $select, $where, $attrs) = @_;
1731
1732   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1733
1734   my $sql_maker = $self->sql_maker;
1735   $sql_maker->{_dbic_rs_attrs} = {
1736     %$attrs,
1737     select => $select,
1738     from => $ident,
1739     where => $where,
1740     $rs_alias && $alias2source->{$rs_alias}
1741       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1742       : ()
1743     ,
1744   };
1745
1746   # calculate bind_attrs before possible $ident mangling
1747   my $bind_attrs = {};
1748   for my $alias (keys %$alias2source) {
1749     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1750     for my $col (keys %$bindtypes) {
1751
1752       my $fqcn = join ('.', $alias, $col);
1753       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1754
1755       # Unqialified column names are nice, but at the same time can be
1756       # rather ambiguous. What we do here is basically go along with
1757       # the loop, adding an unqualified column slot to $bind_attrs,
1758       # alongside the fully qualified name. As soon as we encounter
1759       # another column by that name (which would imply another table)
1760       # we unset the unqualified slot and never add any info to it
1761       # to avoid erroneous type binding. If this happens the users
1762       # only choice will be to fully qualify his column name
1763
1764       if (exists $bind_attrs->{$col}) {
1765         $bind_attrs->{$col} = {};
1766       }
1767       else {
1768         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1769       }
1770     }
1771   }
1772
1773   # adjust limits
1774   if (
1775     $attrs->{software_limit}
1776       ||
1777     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1778   ) {
1779     $attrs->{software_limit} = 1;
1780   }
1781   else {
1782     $self->throw_exception("rows attribute must be positive if present")
1783       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1784
1785     # MySQL actually recommends this approach.  I cringe.
1786     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1787   }
1788
1789   my @limit;
1790
1791   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1792   # storage, unless software limit was requested
1793   if (
1794     #limited has_many
1795     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1796        ||
1797     # limited prefetch with RNO subqueries
1798     (
1799       $attrs->{rows}
1800         &&
1801       $sql_maker->limit_dialect eq 'RowNumberOver'
1802         &&
1803       $attrs->{_prefetch_select}
1804         &&
1805       @{$attrs->{_prefetch_select}}
1806     )
1807       ||
1808     # grouped prefetch
1809     ( $attrs->{group_by}
1810         &&
1811       @{$attrs->{group_by}}
1812         &&
1813       $attrs->{_prefetch_select}
1814         &&
1815       @{$attrs->{_prefetch_select}}
1816     )
1817   ) {
1818     ($ident, $select, $where, $attrs)
1819       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1820   }
1821
1822   elsif (
1823     ($attrs->{rows} || $attrs->{offset})
1824       &&
1825     $sql_maker->limit_dialect eq 'RowNumberOver'
1826       &&
1827     (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
1828       &&
1829     scalar $self->_parse_order_by ($attrs->{order_by})
1830   ) {
1831     # the RNO limit dialect above mangles the SQL such that the join gets lost
1832     # wrap a subquery here
1833
1834     push @limit, delete @{$attrs}{qw/rows offset/};
1835
1836     my $subq = $self->_select_args_to_query (
1837       $ident,
1838       $select,
1839       $where,
1840       $attrs,
1841     );
1842
1843     $ident = {
1844       -alias => $attrs->{alias},
1845       -source_handle => $ident->[0]{-source_handle},
1846       $attrs->{alias} => $subq,
1847     };
1848
1849     # all part of the subquery now
1850     delete @{$attrs}{qw/order_by group_by having/};
1851     $where = undef;
1852   }
1853
1854   elsif (! $attrs->{software_limit} ) {
1855     push @limit, $attrs->{rows}, $attrs->{offset};
1856   }
1857
1858   # try to simplify the joinmap further (prune unreferenced type-single joins)
1859   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1860
1861 ###
1862   # This would be the point to deflate anything found in $where
1863   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1864   # expect a row object. And all we have is a resultsource (it is trivial
1865   # to extract deflator coderefs via $alias2source above).
1866   #
1867   # I don't see a way forward other than changing the way deflators are
1868   # invoked, and that's just bad...
1869 ###
1870
1871   my $order = { map
1872     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1873     (qw/order_by group_by having/ )
1874   };
1875
1876   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1877 }
1878
1879 # Returns a counting SELECT for a simple count
1880 # query. Abstracted so that a storage could override
1881 # this to { count => 'firstcol' } or whatever makes
1882 # sense as a performance optimization
1883 sub _count_select {
1884   #my ($self, $source, $rs_attrs) = @_;
1885   return { count => '*' };
1886 }
1887
1888 # Returns a SELECT which will end up in the subselect
1889 # There may or may not be a group_by, as the subquery
1890 # might have been called to accomodate a limit
1891 #
1892 # Most databases would be happy with whatever ends up
1893 # here, but some choke in various ways.
1894 #
1895 sub _subq_count_select {
1896   my ($self, $source, $rs_attrs) = @_;
1897
1898   if (my $groupby = $rs_attrs->{group_by}) {
1899
1900     my $avail_columns = $self->_resolve_column_info ($rs_attrs->{from});
1901
1902     my $sel_index;
1903     for my $sel (@{$rs_attrs->{select}}) {
1904       if (ref $sel eq 'HASH' and $sel->{-as}) {
1905         $sel_index->{$sel->{-as}} = $sel;
1906       }
1907     }
1908
1909     my @selection;
1910     for my $g_part (@$groupby) {
1911       if (ref $g_part or $avail_columns->{$g_part}) {
1912         push @selection, $g_part;
1913       }
1914       elsif ($sel_index->{$g_part}) {
1915         push @selection, $sel_index->{$g_part};
1916       }
1917       else {
1918         $self->throw_exception ("group_by criteria '$g_part' not contained within current resultset source(s)");
1919       }
1920     }
1921
1922     return \@selection;
1923   }
1924
1925   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1926   return @pcols ? \@pcols : [ 1 ];
1927 }
1928
1929 sub source_bind_attributes {
1930   my ($self, $source) = @_;
1931
1932   my $bind_attributes;
1933   foreach my $column ($source->columns) {
1934
1935     my $data_type = $source->column_info($column)->{data_type} || '';
1936     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1937      if $data_type;
1938   }
1939
1940   return $bind_attributes;
1941 }
1942
1943 =head2 select
1944
1945 =over 4
1946
1947 =item Arguments: $ident, $select, $condition, $attrs
1948
1949 =back
1950
1951 Handle a SQL select statement.
1952
1953 =cut
1954
1955 sub select {
1956   my $self = shift;
1957   my ($ident, $select, $condition, $attrs) = @_;
1958   return $self->cursor_class->new($self, \@_, $attrs);
1959 }
1960
1961 sub select_single {
1962   my $self = shift;
1963   my ($rv, $sth, @bind) = $self->_select(@_);
1964   my @row = $sth->fetchrow_array;
1965   my @nextrow = $sth->fetchrow_array if @row;
1966   if(@row && @nextrow) {
1967     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1968   }
1969   # Need to call finish() to work round broken DBDs
1970   $sth->finish();
1971   return @row;
1972 }
1973
1974 =head2 sth
1975
1976 =over 4
1977
1978 =item Arguments: $sql
1979
1980 =back
1981
1982 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1983
1984 =cut
1985
1986 sub _dbh_sth {
1987   my ($self, $dbh, $sql) = @_;
1988
1989   # 3 is the if_active parameter which avoids active sth re-use
1990   my $sth = $self->disable_sth_caching
1991     ? $dbh->prepare($sql)
1992     : $dbh->prepare_cached($sql, {}, 3);
1993
1994   # XXX You would think RaiseError would make this impossible,
1995   #  but apparently that's not true :(
1996   $self->throw_exception($dbh->errstr) if !$sth;
1997
1998   $sth;
1999 }
2000
2001 sub sth {
2002   my ($self, $sql) = @_;
2003   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2004 }
2005
2006 sub _dbh_columns_info_for {
2007   my ($self, $dbh, $table) = @_;
2008
2009   if ($dbh->can('column_info')) {
2010     my %result;
2011     eval {
2012       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2013       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2014       $sth->execute();
2015       while ( my $info = $sth->fetchrow_hashref() ){
2016         my %column_info;
2017         $column_info{data_type}   = $info->{TYPE_NAME};
2018         $column_info{size}      = $info->{COLUMN_SIZE};
2019         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2020         $column_info{default_value} = $info->{COLUMN_DEF};
2021         my $col_name = $info->{COLUMN_NAME};
2022         $col_name =~ s/^\"(.*)\"$/$1/;
2023
2024         $result{$col_name} = \%column_info;
2025       }
2026     };
2027     return \%result if !$@ && scalar keys %result;
2028   }
2029
2030   my %result;
2031   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2032   $sth->execute;
2033   my @columns = @{$sth->{NAME_lc}};
2034   for my $i ( 0 .. $#columns ){
2035     my %column_info;
2036     $column_info{data_type} = $sth->{TYPE}->[$i];
2037     $column_info{size} = $sth->{PRECISION}->[$i];
2038     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2039
2040     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2041       $column_info{data_type} = $1;
2042       $column_info{size}    = $2;
2043     }
2044
2045     $result{$columns[$i]} = \%column_info;
2046   }
2047   $sth->finish;
2048
2049   foreach my $col (keys %result) {
2050     my $colinfo = $result{$col};
2051     my $type_num = $colinfo->{data_type};
2052     my $type_name;
2053     if(defined $type_num && $dbh->can('type_info')) {
2054       my $type_info = $dbh->type_info($type_num);
2055       $type_name = $type_info->{TYPE_NAME} if $type_info;
2056       $colinfo->{data_type} = $type_name if $type_name;
2057     }
2058   }
2059
2060   return \%result;
2061 }
2062
2063 sub columns_info_for {
2064   my ($self, $table) = @_;
2065   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2066 }
2067
2068 =head2 last_insert_id
2069
2070 Return the row id of the last insert.
2071
2072 =cut
2073
2074 sub _dbh_last_insert_id {
2075     my ($self, $dbh, $source, $col) = @_;
2076
2077     my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2078
2079     return $id if defined $id;
2080
2081     my $class = ref $self;
2082     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2083 }
2084
2085 sub last_insert_id {
2086   my $self = shift;
2087   $self->_dbh_last_insert_id ($self->_dbh, @_);
2088 }
2089
2090 =head2 _native_data_type
2091
2092 =over 4
2093
2094 =item Arguments: $type_name
2095
2096 =back
2097
2098 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2099 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2100 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2101
2102 The default implementation returns C<undef>, implement in your Storage driver if
2103 you need this functionality.
2104
2105 Should map types from other databases to the native RDBMS type, for example
2106 C<VARCHAR2> to C<VARCHAR>.
2107
2108 Types with modifiers should map to the underlying data type. For example,
2109 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2110
2111 Composite types should map to the container type, for example
2112 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2113
2114 =cut
2115
2116 sub _native_data_type {
2117   #my ($self, $data_type) = @_;
2118   return undef
2119 }
2120
2121 # Check if placeholders are supported at all
2122 sub _placeholders_supported {
2123   my $self = shift;
2124   my $dbh  = $self->_get_dbh;
2125
2126   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2127   # but it is inaccurate more often than not
2128   eval {
2129     local $dbh->{PrintError} = 0;
2130     local $dbh->{RaiseError} = 1;
2131     $dbh->do('select ?', {}, 1);
2132   };
2133   return $@ ? 0 : 1;
2134 }
2135
2136 # Check if placeholders bound to non-string types throw exceptions
2137 #
2138 sub _typeless_placeholders_supported {
2139   my $self = shift;
2140   my $dbh  = $self->_get_dbh;
2141
2142   eval {
2143     local $dbh->{PrintError} = 0;
2144     local $dbh->{RaiseError} = 1;
2145     # this specifically tests a bind that is NOT a string
2146     $dbh->do('select 1 where 1 = ?', {}, 1);
2147   };
2148   return $@ ? 0 : 1;
2149 }
2150
2151 =head2 sqlt_type
2152
2153 Returns the database driver name.
2154
2155 =cut
2156
2157 sub sqlt_type {
2158   shift->_get_dbh->{Driver}->{Name};
2159 }
2160
2161 =head2 bind_attribute_by_data_type
2162
2163 Given a datatype from column info, returns a database specific bind
2164 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2165 let the database planner just handle it.
2166
2167 Generally only needed for special case column types, like bytea in postgres.
2168
2169 =cut
2170
2171 sub bind_attribute_by_data_type {
2172     return;
2173 }
2174
2175 =head2 is_datatype_numeric
2176
2177 Given a datatype from column_info, returns a boolean value indicating if
2178 the current RDBMS considers it a numeric value. This controls how
2179 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2180 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2181 be performed instead of the usual C<eq>.
2182
2183 =cut
2184
2185 sub is_datatype_numeric {
2186   my ($self, $dt) = @_;
2187
2188   return 0 unless $dt;
2189
2190   return $dt =~ /^ (?:
2191     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2192   ) $/ix;
2193 }
2194
2195
2196 =head2 create_ddl_dir
2197
2198 =over 4
2199
2200 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2201
2202 =back
2203
2204 Creates a SQL file based on the Schema, for each of the specified
2205 database engines in C<\@databases> in the given directory.
2206 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2207
2208 Given a previous version number, this will also create a file containing
2209 the ALTER TABLE statements to transform the previous schema into the
2210 current one. Note that these statements may contain C<DROP TABLE> or
2211 C<DROP COLUMN> statements that can potentially destroy data.
2212
2213 The file names are created using the C<ddl_filename> method below, please
2214 override this method in your schema if you would like a different file
2215 name format. For the ALTER file, the same format is used, replacing
2216 $version in the name with "$preversion-$version".
2217
2218 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2219 The most common value for this would be C<< { add_drop_table => 1 } >>
2220 to have the SQL produced include a C<DROP TABLE> statement for each table
2221 created. For quoting purposes supply C<quote_table_names> and
2222 C<quote_field_names>.
2223
2224 If no arguments are passed, then the following default values are assumed:
2225
2226 =over 4
2227
2228 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2229
2230 =item version    - $schema->schema_version
2231
2232 =item directory  - './'
2233
2234 =item preversion - <none>
2235
2236 =back
2237
2238 By default, C<\%sqlt_args> will have
2239
2240  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2241
2242 merged with the hash passed in. To disable any of those features, pass in a
2243 hashref like the following
2244
2245  { ignore_constraint_names => 0, # ... other options }
2246
2247
2248 WARNING: You are strongly advised to check all SQL files created, before applying
2249 them.
2250
2251 =cut
2252
2253 sub create_ddl_dir {
2254   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2255
2256   if(!$dir || !-d $dir) {
2257     carp "No directory given, using ./\n";
2258     $dir = "./";
2259   }
2260   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2261   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2262
2263   my $schema_version = $schema->schema_version || '1.x';
2264   $version ||= $schema_version;
2265
2266   $sqltargs = {
2267     add_drop_table => 1,
2268     ignore_constraint_names => 1,
2269     ignore_index_names => 1,
2270     %{$sqltargs || {}}
2271   };
2272
2273   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2274     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2275   }
2276
2277   my $sqlt = SQL::Translator->new( $sqltargs );
2278
2279   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2280   my $sqlt_schema = $sqlt->translate({ data => $schema })
2281     or $self->throw_exception ($sqlt->error);
2282
2283   foreach my $db (@$databases) {
2284     $sqlt->reset();
2285     $sqlt->{schema} = $sqlt_schema;
2286     $sqlt->producer($db);
2287
2288     my $file;
2289     my $filename = $schema->ddl_filename($db, $version, $dir);
2290     if (-e $filename && ($version eq $schema_version )) {
2291       # if we are dumping the current version, overwrite the DDL
2292       carp "Overwriting existing DDL file - $filename";
2293       unlink($filename);
2294     }
2295
2296     my $output = $sqlt->translate;
2297     if(!$output) {
2298       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2299       next;
2300     }
2301     if(!open($file, ">$filename")) {
2302       $self->throw_exception("Can't open $filename for writing ($!)");
2303       next;
2304     }
2305     print $file $output;
2306     close($file);
2307
2308     next unless ($preversion);
2309
2310     require SQL::Translator::Diff;
2311
2312     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2313     if(!-e $prefilename) {
2314       carp("No previous schema file found ($prefilename)");
2315       next;
2316     }
2317
2318     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2319     if(-e $difffile) {
2320       carp("Overwriting existing diff file - $difffile");
2321       unlink($difffile);
2322     }
2323
2324     my $source_schema;
2325     {
2326       my $t = SQL::Translator->new($sqltargs);
2327       $t->debug( 0 );
2328       $t->trace( 0 );
2329
2330       $t->parser( $db )
2331         or $self->throw_exception ($t->error);
2332
2333       my $out = $t->translate( $prefilename )
2334         or $self->throw_exception ($t->error);
2335
2336       $source_schema = $t->schema;
2337
2338       $source_schema->name( $prefilename )
2339         unless ( $source_schema->name );
2340     }
2341
2342     # The "new" style of producers have sane normalization and can support
2343     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2344     # And we have to diff parsed SQL against parsed SQL.
2345     my $dest_schema = $sqlt_schema;
2346
2347     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2348       my $t = SQL::Translator->new($sqltargs);
2349       $t->debug( 0 );
2350       $t->trace( 0 );
2351
2352       $t->parser( $db )
2353         or $self->throw_exception ($t->error);
2354
2355       my $out = $t->translate( $filename )
2356         or $self->throw_exception ($t->error);
2357
2358       $dest_schema = $t->schema;
2359
2360       $dest_schema->name( $filename )
2361         unless $dest_schema->name;
2362     }
2363
2364     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2365                                                   $dest_schema,   $db,
2366                                                   $sqltargs
2367                                                  );
2368     if(!open $file, ">$difffile") {
2369       $self->throw_exception("Can't write to $difffile ($!)");
2370       next;
2371     }
2372     print $file $diff;
2373     close($file);
2374   }
2375 }
2376
2377 =head2 deployment_statements
2378
2379 =over 4
2380
2381 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2382
2383 =back
2384
2385 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2386
2387 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2388 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2389
2390 C<$directory> is used to return statements from files in a previously created
2391 L</create_ddl_dir> directory and is optional. The filenames are constructed
2392 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2393
2394 If no C<$directory> is specified then the statements are constructed on the
2395 fly using L<SQL::Translator> and C<$version> is ignored.
2396
2397 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2398
2399 =cut
2400
2401 sub deployment_statements {
2402   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2403   $type ||= $self->sqlt_type;
2404   $version ||= $schema->schema_version || '1.x';
2405   $dir ||= './';
2406   my $filename = $schema->ddl_filename($type, $version, $dir);
2407   if(-f $filename)
2408   {
2409       my $file;
2410       open($file, "<$filename")
2411         or $self->throw_exception("Can't open $filename ($!)");
2412       my @rows = <$file>;
2413       close($file);
2414       return join('', @rows);
2415   }
2416
2417   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2418     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2419   }
2420
2421   # sources needs to be a parser arg, but for simplicty allow at top level
2422   # coming in
2423   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2424       if exists $sqltargs->{sources};
2425
2426   my $tr = SQL::Translator->new(
2427     producer => "SQL::Translator::Producer::${type}",
2428     %$sqltargs,
2429     parser => 'SQL::Translator::Parser::DBIx::Class',
2430     data => $schema,
2431   );
2432
2433   my @ret;
2434   my $wa = wantarray;
2435   if ($wa) {
2436     @ret = $tr->translate;
2437   }
2438   else {
2439     $ret[0] = $tr->translate;
2440   }
2441
2442   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2443     unless (@ret && defined $ret[0]);
2444
2445   return $wa ? @ret : $ret[0];
2446 }
2447
2448 sub deploy {
2449   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2450   my $deploy = sub {
2451     my $line = shift;
2452     return if($line =~ /^--/);
2453     return if(!$line);
2454     # next if($line =~ /^DROP/m);
2455     return if($line =~ /^BEGIN TRANSACTION/m);
2456     return if($line =~ /^COMMIT/m);
2457     return if $line =~ /^\s+$/; # skip whitespace only
2458     $self->_query_start($line);
2459     eval {
2460       # do a dbh_do cycle here, as we need some error checking in
2461       # place (even though we will ignore errors)
2462       $self->dbh_do (sub { $_[1]->do($line) });
2463     };
2464     if ($@) {
2465       carp qq{$@ (running "${line}")};
2466     }
2467     $self->_query_end($line);
2468   };
2469   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2470   if (@statements > 1) {
2471     foreach my $statement (@statements) {
2472       $deploy->( $statement );
2473     }
2474   }
2475   elsif (@statements == 1) {
2476     foreach my $line ( split(";\n", $statements[0])) {
2477       $deploy->( $line );
2478     }
2479   }
2480 }
2481
2482 =head2 datetime_parser
2483
2484 Returns the datetime parser class
2485
2486 =cut
2487
2488 sub datetime_parser {
2489   my $self = shift;
2490   return $self->{datetime_parser} ||= do {
2491     $self->build_datetime_parser(@_);
2492   };
2493 }
2494
2495 =head2 datetime_parser_type
2496
2497 Defines (returns) the datetime parser class - currently hardwired to
2498 L<DateTime::Format::MySQL>
2499
2500 =cut
2501
2502 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2503
2504 =head2 build_datetime_parser
2505
2506 See L</datetime_parser>
2507
2508 =cut
2509
2510 sub build_datetime_parser {
2511   my $self = shift;
2512   my $type = $self->datetime_parser_type(@_);
2513   $self->ensure_class_loaded ($type);
2514   return $type;
2515 }
2516
2517
2518 =head2 is_replicating
2519
2520 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2521 replicate from a master database.  Default is undef, which is the result
2522 returned by databases that don't support replication.
2523
2524 =cut
2525
2526 sub is_replicating {
2527     return;
2528
2529 }
2530
2531 =head2 lag_behind_master
2532
2533 Returns a number that represents a certain amount of lag behind a master db
2534 when a given storage is replicating.  The number is database dependent, but
2535 starts at zero and increases with the amount of lag. Default in undef
2536
2537 =cut
2538
2539 sub lag_behind_master {
2540     return;
2541 }
2542
2543 =head2 relname_to_table_alias
2544
2545 =over 4
2546
2547 =item Arguments: $relname, $join_count
2548
2549 =back
2550
2551 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2552 queries.
2553
2554 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2555 way these aliases are named.
2556
2557 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2558 otherwise C<"$relname">.
2559
2560 =cut
2561
2562 sub relname_to_table_alias {
2563   my ($self, $relname, $join_count) = @_;
2564
2565   my $alias = ($join_count && $join_count > 1 ?
2566     join('_', $relname, $join_count) : $relname);
2567
2568   return $alias;
2569 }
2570
2571 sub DESTROY {
2572   my $self = shift;
2573
2574   $self->_verify_pid if $self->_dbh;
2575
2576   # some databases need this to stop spewing warnings
2577   if (my $dbh = $self->_dbh) {
2578     local $@;
2579     eval {
2580       %{ $dbh->{CachedKids} } = ();
2581       $dbh->disconnect;
2582     };
2583   }
2584
2585   $self->_dbh(undef);
2586 }
2587
2588 1;
2589
2590 =head1 USAGE NOTES
2591
2592 =head2 DBIx::Class and AutoCommit
2593
2594 DBIx::Class can do some wonderful magic with handling exceptions,
2595 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2596 (the default) combined with C<txn_do> for transaction support.
2597
2598 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2599 in an assumed transaction between commits, and you're telling us you'd
2600 like to manage that manually.  A lot of the magic protections offered by
2601 this module will go away.  We can't protect you from exceptions due to database
2602 disconnects because we don't know anything about how to restart your
2603 transactions.  You're on your own for handling all sorts of exceptional
2604 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2605 be with raw DBI.
2606
2607
2608 =head1 AUTHORS
2609
2610 Matt S. Trout <mst@shadowcatsystems.co.uk>
2611
2612 Andy Grundman <andy@hybridized.org>
2613
2614 =head1 LICENSE
2615
2616 You may distribute this code under the same terms as Perl itself.
2617
2618 =cut