Fix Top-limit problem of missed bindvars
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18 use Try::Tiny;
19 use File::Path ();
20 use namespace::clean;
21
22 __PACKAGE__->mk_group_accessors('simple' =>
23   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
24      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints
25      _server_info_hash/
26 );
27
28 # the values for these accessors are picked out (and deleted) from
29 # the attribute hashref passed to connect_info
30 my @storage_options = qw/
31   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
32   disable_sth_caching unsafe auto_savepoint
33 /;
34 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
35
36
37 # default cursor class, overridable in connect_info attributes
38 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
39
40 __PACKAGE__->mk_group_accessors('inherited' => qw/
41   sql_maker_class
42   _supports_insert_returning
43 /);
44 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
45
46 # Each of these methods need _determine_driver called before itself
47 # in order to function reliably. This is a purely DRY optimization
48 my @rdbms_specific_methods = qw/
49   deployment_statements
50   sqlt_type
51   sql_maker
52   build_datetime_parser
53   datetime_parser_type
54
55   insert
56   insert_bulk
57   update
58   delete
59   select
60   select_single
61 /;
62
63 for my $meth (@rdbms_specific_methods) {
64
65   my $orig = __PACKAGE__->can ($meth)
66     or next;
67
68   no strict qw/refs/;
69   no warnings qw/redefine/;
70   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
71     if (not $_[0]->_driver_determined) {
72       $_[0]->_determine_driver;
73       goto $_[0]->can($meth);
74     }
75     $orig->(@_);
76   };
77 }
78
79
80 =head1 NAME
81
82 DBIx::Class::Storage::DBI - DBI storage handler
83
84 =head1 SYNOPSIS
85
86   my $schema = MySchema->connect('dbi:SQLite:my.db');
87
88   $schema->storage->debug(1);
89
90   my @stuff = $schema->storage->dbh_do(
91     sub {
92       my ($storage, $dbh, @args) = @_;
93       $dbh->do("DROP TABLE authors");
94     },
95     @column_list
96   );
97
98   $schema->resultset('Book')->search({
99      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
100   });
101
102 =head1 DESCRIPTION
103
104 This class represents the connection to an RDBMS via L<DBI>.  See
105 L<DBIx::Class::Storage> for general information.  This pod only
106 documents DBI-specific methods and behaviors.
107
108 =head1 METHODS
109
110 =cut
111
112 sub new {
113   my $new = shift->next::method(@_);
114
115   $new->transaction_depth(0);
116   $new->_sql_maker_opts({});
117   $new->{savepoints} = [];
118   $new->{_in_dbh_do} = 0;
119   $new->{_dbh_gen} = 0;
120
121   # read below to see what this does
122   $new->_arm_global_destructor;
123
124   $new;
125 }
126
127 # This is hack to work around perl shooting stuff in random
128 # order on exit(). If we do not walk the remaining storage
129 # objects in an END block, there is a *small but real* chance
130 # of a fork()ed child to kill the parent's shared DBI handle,
131 # *before perl reaches the DESTROY in this package*
132 # Yes, it is ugly and effective.
133 {
134   my %seek_and_destroy;
135
136   sub _arm_global_destructor {
137     my $self = shift;
138     my $key = Scalar::Util::refaddr ($self);
139     $seek_and_destroy{$key} = $self;
140     Scalar::Util::weaken ($seek_and_destroy{$key});
141   }
142
143   END {
144     local $?; # just in case the DBI destructor changes it somehow
145
146     # destroy just the object if not native to this process/thread
147     $_->_preserve_foreign_dbh for (grep
148       { defined $_ }
149       values %seek_and_destroy
150     );
151   }
152 }
153
154 sub DESTROY {
155   my $self = shift;
156
157   # destroy just the object if not native to this process/thread
158   $self->_preserve_foreign_dbh;
159
160   # some databases need this to stop spewing warnings
161   if (my $dbh = $self->_dbh) {
162     try {
163       %{ $dbh->{CachedKids} } = ();
164       $dbh->disconnect;
165     };
166   }
167
168   $self->_dbh(undef);
169 }
170
171 sub _preserve_foreign_dbh {
172   my $self = shift;
173
174   return unless $self->_dbh;
175
176   $self->_verify_tid;
177
178   return unless $self->_dbh;
179
180   $self->_verify_pid;
181
182 }
183
184 # handle pid changes correctly - do not destroy parent's connection
185 sub _verify_pid {
186   my $self = shift;
187
188   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
189
190   $self->_dbh->{InactiveDestroy} = 1;
191   $self->_dbh(undef);
192   $self->{_dbh_gen}++;
193
194   return;
195 }
196
197 # very similar to above, but seems to FAIL if I set InactiveDestroy
198 sub _verify_tid {
199   my $self = shift;
200
201   if ( ! defined $self->_conn_tid ) {
202     return; # no threads
203   }
204   elsif ( $self->_conn_tid == threads->tid ) {
205     return; # same thread
206   }
207
208   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
209   $self->_dbh(undef);
210   $self->{_dbh_gen}++;
211
212   return;
213 }
214
215
216 =head2 connect_info
217
218 This method is normally called by L<DBIx::Class::Schema/connection>, which
219 encapsulates its argument list in an arrayref before passing them here.
220
221 The argument list may contain:
222
223 =over
224
225 =item *
226
227 The same 4-element argument set one would normally pass to
228 L<DBI/connect>, optionally followed by
229 L<extra attributes|/DBIx::Class specific connection attributes>
230 recognized by DBIx::Class:
231
232   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
233
234 =item *
235
236 A single code reference which returns a connected
237 L<DBI database handle|DBI/connect> optionally followed by
238 L<extra attributes|/DBIx::Class specific connection attributes> recognized
239 by DBIx::Class:
240
241   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
242
243 =item *
244
245 A single hashref with all the attributes and the dsn/user/password
246 mixed together:
247
248   $connect_info_args = [{
249     dsn => $dsn,
250     user => $user,
251     password => $pass,
252     %dbi_attributes,
253     %extra_attributes,
254   }];
255
256   $connect_info_args = [{
257     dbh_maker => sub { DBI->connect (...) },
258     %dbi_attributes,
259     %extra_attributes,
260   }];
261
262 This is particularly useful for L<Catalyst> based applications, allowing the
263 following config (L<Config::General> style):
264
265   <Model::DB>
266     schema_class   App::DB
267     <connect_info>
268       dsn          dbi:mysql:database=test
269       user         testuser
270       password     TestPass
271       AutoCommit   1
272     </connect_info>
273   </Model::DB>
274
275 The C<dsn>/C<user>/C<password> combination can be substituted by the
276 C<dbh_maker> key whose value is a coderef that returns a connected
277 L<DBI database handle|DBI/connect>
278
279 =back
280
281 Please note that the L<DBI> docs recommend that you always explicitly
282 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
283 recommends that it be set to I<1>, and that you perform transactions
284 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
285 to I<1> if you do not do explicitly set it to zero.  This is the default
286 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
287
288 =head3 DBIx::Class specific connection attributes
289
290 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
291 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
292 the following connection options. These options can be mixed in with your other
293 L<DBI> connection attributes, or placed in a separate hashref
294 (C<\%extra_attributes>) as shown above.
295
296 Every time C<connect_info> is invoked, any previous settings for
297 these options will be cleared before setting the new ones, regardless of
298 whether any options are specified in the new C<connect_info>.
299
300
301 =over
302
303 =item on_connect_do
304
305 Specifies things to do immediately after connecting or re-connecting to
306 the database.  Its value may contain:
307
308 =over
309
310 =item a scalar
311
312 This contains one SQL statement to execute.
313
314 =item an array reference
315
316 This contains SQL statements to execute in order.  Each element contains
317 a string or a code reference that returns a string.
318
319 =item a code reference
320
321 This contains some code to execute.  Unlike code references within an
322 array reference, its return value is ignored.
323
324 =back
325
326 =item on_disconnect_do
327
328 Takes arguments in the same form as L</on_connect_do> and executes them
329 immediately before disconnecting from the database.
330
331 Note, this only runs if you explicitly call L</disconnect> on the
332 storage object.
333
334 =item on_connect_call
335
336 A more generalized form of L</on_connect_do> that calls the specified
337 C<connect_call_METHOD> methods in your storage driver.
338
339   on_connect_do => 'select 1'
340
341 is equivalent to:
342
343   on_connect_call => [ [ do_sql => 'select 1' ] ]
344
345 Its values may contain:
346
347 =over
348
349 =item a scalar
350
351 Will call the C<connect_call_METHOD> method.
352
353 =item a code reference
354
355 Will execute C<< $code->($storage) >>
356
357 =item an array reference
358
359 Each value can be a method name or code reference.
360
361 =item an array of arrays
362
363 For each array, the first item is taken to be the C<connect_call_> method name
364 or code reference, and the rest are parameters to it.
365
366 =back
367
368 Some predefined storage methods you may use:
369
370 =over
371
372 =item do_sql
373
374 Executes a SQL string or a code reference that returns a SQL string. This is
375 what L</on_connect_do> and L</on_disconnect_do> use.
376
377 It can take:
378
379 =over
380
381 =item a scalar
382
383 Will execute the scalar as SQL.
384
385 =item an arrayref
386
387 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
388 attributes hashref and bind values.
389
390 =item a code reference
391
392 Will execute C<< $code->($storage) >> and execute the return array refs as
393 above.
394
395 =back
396
397 =item datetime_setup
398
399 Execute any statements necessary to initialize the database session to return
400 and accept datetime/timestamp values used with
401 L<DBIx::Class::InflateColumn::DateTime>.
402
403 Only necessary for some databases, see your specific storage driver for
404 implementation details.
405
406 =back
407
408 =item on_disconnect_call
409
410 Takes arguments in the same form as L</on_connect_call> and executes them
411 immediately before disconnecting from the database.
412
413 Calls the C<disconnect_call_METHOD> methods as opposed to the
414 C<connect_call_METHOD> methods called by L</on_connect_call>.
415
416 Note, this only runs if you explicitly call L</disconnect> on the
417 storage object.
418
419 =item disable_sth_caching
420
421 If set to a true value, this option will disable the caching of
422 statement handles via L<DBI/prepare_cached>.
423
424 =item limit_dialect
425
426 Sets the limit dialect. This is useful for JDBC-bridge among others
427 where the remote SQL-dialect cannot be determined by the name of the
428 driver alone. See also L<SQL::Abstract::Limit>.
429
430 =item quote_char
431
432 Specifies what characters to use to quote table and column names. If
433 you use this you will want to specify L</name_sep> as well.
434
435 C<quote_char> expects either a single character, in which case is it
436 is placed on either side of the table/column name, or an arrayref of length
437 2 in which case the table/column name is placed between the elements.
438
439 For example under MySQL you should use C<< quote_char => '`' >>, and for
440 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
441
442 =item name_sep
443
444 This only needs to be used in conjunction with C<quote_char>, and is used to
445 specify the character that separates elements (schemas, tables, columns) from
446 each other. In most cases this is simply a C<.>.
447
448 The consequences of not supplying this value is that L<SQL::Abstract>
449 will assume DBIx::Class' uses of aliases to be complete column
450 names. The output will look like I<"me.name"> when it should actually
451 be I<"me"."name">.
452
453 =item unsafe
454
455 This Storage driver normally installs its own C<HandleError>, sets
456 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
457 all database handles, including those supplied by a coderef.  It does this
458 so that it can have consistent and useful error behavior.
459
460 If you set this option to a true value, Storage will not do its usual
461 modifications to the database handle's attributes, and instead relies on
462 the settings in your connect_info DBI options (or the values you set in
463 your connection coderef, in the case that you are connecting via coderef).
464
465 Note that your custom settings can cause Storage to malfunction,
466 especially if you set a C<HandleError> handler that suppresses exceptions
467 and/or disable C<RaiseError>.
468
469 =item auto_savepoint
470
471 If this option is true, L<DBIx::Class> will use savepoints when nesting
472 transactions, making it possible to recover from failure in the inner
473 transaction without having to abort all outer transactions.
474
475 =item cursor_class
476
477 Use this argument to supply a cursor class other than the default
478 L<DBIx::Class::Storage::DBI::Cursor>.
479
480 =back
481
482 Some real-life examples of arguments to L</connect_info> and
483 L<DBIx::Class::Schema/connect>
484
485   # Simple SQLite connection
486   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
487
488   # Connect via subref
489   ->connect_info([ sub { DBI->connect(...) } ]);
490
491   # Connect via subref in hashref
492   ->connect_info([{
493     dbh_maker => sub { DBI->connect(...) },
494     on_connect_do => 'alter session ...',
495   }]);
496
497   # A bit more complicated
498   ->connect_info(
499     [
500       'dbi:Pg:dbname=foo',
501       'postgres',
502       'my_pg_password',
503       { AutoCommit => 1 },
504       { quote_char => q{"}, name_sep => q{.} },
505     ]
506   );
507
508   # Equivalent to the previous example
509   ->connect_info(
510     [
511       'dbi:Pg:dbname=foo',
512       'postgres',
513       'my_pg_password',
514       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
515     ]
516   );
517
518   # Same, but with hashref as argument
519   # See parse_connect_info for explanation
520   ->connect_info(
521     [{
522       dsn         => 'dbi:Pg:dbname=foo',
523       user        => 'postgres',
524       password    => 'my_pg_password',
525       AutoCommit  => 1,
526       quote_char  => q{"},
527       name_sep    => q{.},
528     }]
529   );
530
531   # Subref + DBIx::Class-specific connection options
532   ->connect_info(
533     [
534       sub { DBI->connect(...) },
535       {
536           quote_char => q{`},
537           name_sep => q{@},
538           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
539           disable_sth_caching => 1,
540       },
541     ]
542   );
543
544
545
546 =cut
547
548 sub connect_info {
549   my ($self, $info) = @_;
550
551   return $self->_connect_info if !$info;
552
553   $self->_connect_info($info); # copy for _connect_info
554
555   $info = $self->_normalize_connect_info($info)
556     if ref $info eq 'ARRAY';
557
558   for my $storage_opt (keys %{ $info->{storage_options} }) {
559     my $value = $info->{storage_options}{$storage_opt};
560
561     $self->$storage_opt($value);
562   }
563
564   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
565   #  the new set of options
566   $self->_sql_maker(undef);
567   $self->_sql_maker_opts({});
568
569   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
570     my $value = $info->{sql_maker_options}{$sql_maker_opt};
571
572     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
573   }
574
575   my %attrs = (
576     %{ $self->_default_dbi_connect_attributes || {} },
577     %{ $info->{attributes} || {} },
578   );
579
580   my @args = @{ $info->{arguments} };
581
582   $self->_dbi_connect_info([@args,
583     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
584
585   return $self->_connect_info;
586 }
587
588 sub _normalize_connect_info {
589   my ($self, $info_arg) = @_;
590   my %info;
591
592   my @args = @$info_arg;  # take a shallow copy for further mutilation
593
594   # combine/pre-parse arguments depending on invocation style
595
596   my %attrs;
597   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
598     %attrs = %{ $args[1] || {} };
599     @args = $args[0];
600   }
601   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
602     %attrs = %{$args[0]};
603     @args = ();
604     if (my $code = delete $attrs{dbh_maker}) {
605       @args = $code;
606
607       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
608       if (@ignored) {
609         carp sprintf (
610             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
611           . "to the result of 'dbh_maker'",
612
613           join (', ', map { "'$_'" } (@ignored) ),
614         );
615       }
616     }
617     else {
618       @args = delete @attrs{qw/dsn user password/};
619     }
620   }
621   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
622     %attrs = (
623       % { $args[3] || {} },
624       % { $args[4] || {} },
625     );
626     @args = @args[0,1,2];
627   }
628
629   $info{arguments} = \@args;
630
631   my @storage_opts = grep exists $attrs{$_},
632     @storage_options, 'cursor_class';
633
634   @{ $info{storage_options} }{@storage_opts} =
635     delete @attrs{@storage_opts} if @storage_opts;
636
637   my @sql_maker_opts = grep exists $attrs{$_},
638     qw/limit_dialect quote_char name_sep/;
639
640   @{ $info{sql_maker_options} }{@sql_maker_opts} =
641     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
642
643   $info{attributes} = \%attrs if %attrs;
644
645   return \%info;
646 }
647
648 sub _default_dbi_connect_attributes {
649   return {
650     AutoCommit => 1,
651     RaiseError => 1,
652     PrintError => 0,
653   };
654 }
655
656 =head2 on_connect_do
657
658 This method is deprecated in favour of setting via L</connect_info>.
659
660 =cut
661
662 =head2 on_disconnect_do
663
664 This method is deprecated in favour of setting via L</connect_info>.
665
666 =cut
667
668 sub _parse_connect_do {
669   my ($self, $type) = @_;
670
671   my $val = $self->$type;
672   return () if not defined $val;
673
674   my @res;
675
676   if (not ref($val)) {
677     push @res, [ 'do_sql', $val ];
678   } elsif (ref($val) eq 'CODE') {
679     push @res, $val;
680   } elsif (ref($val) eq 'ARRAY') {
681     push @res, map { [ 'do_sql', $_ ] } @$val;
682   } else {
683     $self->throw_exception("Invalid type for $type: ".ref($val));
684   }
685
686   return \@res;
687 }
688
689 =head2 dbh_do
690
691 Arguments: ($subref | $method_name), @extra_coderef_args?
692
693 Execute the given $subref or $method_name using the new exception-based
694 connection management.
695
696 The first two arguments will be the storage object that C<dbh_do> was called
697 on and a database handle to use.  Any additional arguments will be passed
698 verbatim to the called subref as arguments 2 and onwards.
699
700 Using this (instead of $self->_dbh or $self->dbh) ensures correct
701 exception handling and reconnection (or failover in future subclasses).
702
703 Your subref should have no side-effects outside of the database, as
704 there is the potential for your subref to be partially double-executed
705 if the database connection was stale/dysfunctional.
706
707 Example:
708
709   my @stuff = $schema->storage->dbh_do(
710     sub {
711       my ($storage, $dbh, @cols) = @_;
712       my $cols = join(q{, }, @cols);
713       $dbh->selectrow_array("SELECT $cols FROM foo");
714     },
715     @column_list
716   );
717
718 =cut
719
720 sub dbh_do {
721   my $self = shift;
722   my $code = shift;
723
724   my $dbh = $self->_get_dbh;
725
726   return $self->$code($dbh, @_)
727     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
728
729   local $self->{_in_dbh_do} = 1;
730
731   my @args = @_;
732   return try {
733     $self->$code ($dbh, @args);
734   } catch {
735     $self->throw_exception($_) if $self->connected;
736
737     # We were not connected - reconnect and retry, but let any
738     #  exception fall right through this time
739     carp "Retrying $code after catching disconnected exception: $_"
740       if $ENV{DBIC_DBIRETRY_DEBUG};
741
742     $self->_populate_dbh;
743     $self->$code($self->_dbh, @args);
744   };
745 }
746
747 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
748 # It also informs dbh_do to bypass itself while under the direction of txn_do,
749 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
750 sub txn_do {
751   my $self = shift;
752   my $coderef = shift;
753
754   ref $coderef eq 'CODE' or $self->throw_exception
755     ('$coderef must be a CODE reference');
756
757   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
758
759   local $self->{_in_dbh_do} = 1;
760
761   my @result;
762   my $want_array = wantarray;
763
764   my $tried = 0;
765   while(1) {
766     my $exception;
767     my @args = @_;
768     try {
769       $self->_get_dbh;
770
771       $self->txn_begin;
772       if($want_array) {
773           @result = $coderef->(@args);
774       }
775       elsif(defined $want_array) {
776           $result[0] = $coderef->(@args);
777       }
778       else {
779           $coderef->(@args);
780       }
781       $self->txn_commit;
782     } catch {
783       $exception = $_;
784     };
785
786     if(! defined $exception) { return $want_array ? @result : $result[0] }
787
788     if($tried++ || $self->connected) {
789       my $rollback_exception;
790       try { $self->txn_rollback } catch { $rollback_exception = shift };
791       if(defined $rollback_exception) {
792         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
793         $self->throw_exception($exception)  # propagate nested rollback
794           if $rollback_exception =~ /$exception_class/;
795
796         $self->throw_exception(
797           "Transaction aborted: ${exception}. "
798           . "Rollback failed: ${rollback_exception}"
799         );
800       }
801       $self->throw_exception($exception)
802     }
803
804     # We were not connected, and was first try - reconnect and retry
805     # via the while loop
806     carp "Retrying $coderef after catching disconnected exception: $exception"
807       if $ENV{DBIC_DBIRETRY_DEBUG};
808     $self->_populate_dbh;
809   }
810 }
811
812 =head2 disconnect
813
814 Our C<disconnect> method also performs a rollback first if the
815 database is not in C<AutoCommit> mode.
816
817 =cut
818
819 sub disconnect {
820   my ($self) = @_;
821
822   if( $self->_dbh ) {
823     my @actions;
824
825     push @actions, ( $self->on_disconnect_call || () );
826     push @actions, $self->_parse_connect_do ('on_disconnect_do');
827
828     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
829
830     $self->_dbh_rollback unless $self->_dbh_autocommit;
831
832     %{ $self->_dbh->{CachedKids} } = ();
833     $self->_dbh->disconnect;
834     $self->_dbh(undef);
835     $self->{_dbh_gen}++;
836   }
837 }
838
839 =head2 with_deferred_fk_checks
840
841 =over 4
842
843 =item Arguments: C<$coderef>
844
845 =item Return Value: The return value of $coderef
846
847 =back
848
849 Storage specific method to run the code ref with FK checks deferred or
850 in MySQL's case disabled entirely.
851
852 =cut
853
854 # Storage subclasses should override this
855 sub with_deferred_fk_checks {
856   my ($self, $sub) = @_;
857   $sub->();
858 }
859
860 =head2 connected
861
862 =over
863
864 =item Arguments: none
865
866 =item Return Value: 1|0
867
868 =back
869
870 Verifies that the current database handle is active and ready to execute
871 an SQL statement (e.g. the connection did not get stale, server is still
872 answering, etc.) This method is used internally by L</dbh>.
873
874 =cut
875
876 sub connected {
877   my $self = shift;
878   return 0 unless $self->_seems_connected;
879
880   #be on the safe side
881   local $self->_dbh->{RaiseError} = 1;
882
883   return $self->_ping;
884 }
885
886 sub _seems_connected {
887   my $self = shift;
888
889   $self->_preserve_foreign_dbh;
890
891   my $dbh = $self->_dbh
892     or return 0;
893
894   return $dbh->FETCH('Active');
895 }
896
897 sub _ping {
898   my $self = shift;
899
900   my $dbh = $self->_dbh or return 0;
901
902   return $dbh->ping;
903 }
904
905 sub ensure_connected {
906   my ($self) = @_;
907
908   unless ($self->connected) {
909     $self->_populate_dbh;
910   }
911 }
912
913 =head2 dbh
914
915 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
916 is guaranteed to be healthy by implicitly calling L</connected>, and if
917 necessary performing a reconnection before returning. Keep in mind that this
918 is very B<expensive> on some database engines. Consider using L</dbh_do>
919 instead.
920
921 =cut
922
923 sub dbh {
924   my ($self) = @_;
925
926   if (not $self->_dbh) {
927     $self->_populate_dbh;
928   } else {
929     $self->ensure_connected;
930   }
931   return $self->_dbh;
932 }
933
934 # this is the internal "get dbh or connect (don't check)" method
935 sub _get_dbh {
936   my $self = shift;
937   $self->_preserve_foreign_dbh;
938   $self->_populate_dbh unless $self->_dbh;
939   return $self->_dbh;
940 }
941
942 sub _sql_maker_args {
943     my ($self) = @_;
944
945     return (
946       bindtype=>'columns',
947       array_datatypes => 1,
948       limit_dialect => $self->_get_dbh,
949       %{$self->_sql_maker_opts}
950     );
951 }
952
953 sub sql_maker {
954   my ($self) = @_;
955   unless ($self->_sql_maker) {
956     my $sql_maker_class = $self->sql_maker_class;
957     $self->ensure_class_loaded ($sql_maker_class);
958     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
959   }
960   return $self->_sql_maker;
961 }
962
963 # nothing to do by default
964 sub _rebless {}
965 sub _init {}
966
967 sub _populate_dbh {
968   my ($self) = @_;
969
970   my @info = @{$self->_dbi_connect_info || []};
971   $self->_dbh(undef); # in case ->connected failed we might get sent here
972   $self->_server_info_hash (undef);
973   $self->_dbh($self->_connect(@info));
974
975   $self->_conn_pid($$);
976   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
977
978   $self->_determine_driver;
979
980   # Always set the transaction depth on connect, since
981   #  there is no transaction in progress by definition
982   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
983
984   $self->_run_connection_actions unless $self->{_in_determine_driver};
985 }
986
987 sub _run_connection_actions {
988   my $self = shift;
989   my @actions;
990
991   push @actions, ( $self->on_connect_call || () );
992   push @actions, $self->_parse_connect_do ('on_connect_do');
993
994   $self->_do_connection_actions(connect_call_ => $_) for @actions;
995 }
996
997 sub _server_info {
998   my $self = shift;
999
1000   unless ($self->_server_info_hash) {
1001
1002     my %info;
1003
1004     my $server_version = try { $self->_get_server_version };
1005
1006     if (defined $server_version) {
1007       $info{dbms_version} = $server_version;
1008
1009       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1010       my @verparts = split (/\./, $numeric_version);
1011       if (
1012         @verparts
1013           &&
1014         $verparts[0] <= 999
1015       ) {
1016         # consider only up to 3 version parts, iff not more than 3 digits
1017         my @use_parts;
1018         while (@verparts && @use_parts < 3) {
1019           my $p = shift @verparts;
1020           last if $p > 999;
1021           push @use_parts, $p;
1022         }
1023         push @use_parts, 0 while @use_parts < 3;
1024
1025         $info{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1026       }
1027     }
1028
1029     $self->_server_info_hash(\%info);
1030   }
1031
1032   return $self->_server_info_hash
1033 }
1034
1035 sub _get_server_version {
1036   shift->_get_dbh->get_info(18);
1037 }
1038
1039 sub _determine_driver {
1040   my ($self) = @_;
1041
1042   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1043     my $started_connected = 0;
1044     local $self->{_in_determine_driver} = 1;
1045
1046     if (ref($self) eq __PACKAGE__) {
1047       my $driver;
1048       if ($self->_dbh) { # we are connected
1049         $driver = $self->_dbh->{Driver}{Name};
1050         $started_connected = 1;
1051       } else {
1052         # if connect_info is a CODEREF, we have no choice but to connect
1053         if (ref $self->_dbi_connect_info->[0] &&
1054             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
1055           $self->_populate_dbh;
1056           $driver = $self->_dbh->{Driver}{Name};
1057         }
1058         else {
1059           # try to use dsn to not require being connected, the driver may still
1060           # force a connection in _rebless to determine version
1061           # (dsn may not be supplied at all if all we do is make a mock-schema)
1062           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1063           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1064           $driver ||= $ENV{DBI_DRIVER};
1065         }
1066       }
1067
1068       if ($driver) {
1069         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1070         if ($self->load_optional_class($storage_class)) {
1071           mro::set_mro($storage_class, 'c3');
1072           bless $self, $storage_class;
1073           $self->_rebless();
1074         }
1075       }
1076     }
1077
1078     $self->_driver_determined(1);
1079
1080     $self->_init; # run driver-specific initializations
1081
1082     $self->_run_connection_actions
1083         if !$started_connected && defined $self->_dbh;
1084   }
1085 }
1086
1087 sub _do_connection_actions {
1088   my $self          = shift;
1089   my $method_prefix = shift;
1090   my $call          = shift;
1091
1092   if (not ref($call)) {
1093     my $method = $method_prefix . $call;
1094     $self->$method(@_);
1095   } elsif (ref($call) eq 'CODE') {
1096     $self->$call(@_);
1097   } elsif (ref($call) eq 'ARRAY') {
1098     if (ref($call->[0]) ne 'ARRAY') {
1099       $self->_do_connection_actions($method_prefix, $_) for @$call;
1100     } else {
1101       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1102     }
1103   } else {
1104     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1105   }
1106
1107   return $self;
1108 }
1109
1110 sub connect_call_do_sql {
1111   my $self = shift;
1112   $self->_do_query(@_);
1113 }
1114
1115 sub disconnect_call_do_sql {
1116   my $self = shift;
1117   $self->_do_query(@_);
1118 }
1119
1120 # override in db-specific backend when necessary
1121 sub connect_call_datetime_setup { 1 }
1122
1123 sub _do_query {
1124   my ($self, $action) = @_;
1125
1126   if (ref $action eq 'CODE') {
1127     $action = $action->($self);
1128     $self->_do_query($_) foreach @$action;
1129   }
1130   else {
1131     # Most debuggers expect ($sql, @bind), so we need to exclude
1132     # the attribute hash which is the second argument to $dbh->do
1133     # furthermore the bind values are usually to be presented
1134     # as named arrayref pairs, so wrap those here too
1135     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1136     my $sql = shift @do_args;
1137     my $attrs = shift @do_args;
1138     my @bind = map { [ undef, $_ ] } @do_args;
1139
1140     $self->_query_start($sql, @bind);
1141     $self->_get_dbh->do($sql, $attrs, @do_args);
1142     $self->_query_end($sql, @bind);
1143   }
1144
1145   return $self;
1146 }
1147
1148 sub _connect {
1149   my ($self, @info) = @_;
1150
1151   $self->throw_exception("You failed to provide any connection info")
1152     if !@info;
1153
1154   my ($old_connect_via, $dbh);
1155
1156   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1157     $old_connect_via = $DBI::connect_via;
1158     $DBI::connect_via = 'connect';
1159   }
1160
1161   try {
1162     if(ref $info[0] eq 'CODE') {
1163        $dbh = $info[0]->();
1164     }
1165     else {
1166        $dbh = DBI->connect(@info);
1167     }
1168
1169     if (!$dbh) {
1170       die $DBI::errstr;
1171     }
1172
1173     unless ($self->unsafe) {
1174       my $weak_self = $self;
1175       Scalar::Util::weaken($weak_self);
1176       $dbh->{HandleError} = sub {
1177           if ($weak_self) {
1178             $weak_self->throw_exception("DBI Exception: $_[0]");
1179           }
1180           else {
1181             # the handler may be invoked by something totally out of
1182             # the scope of DBIC
1183             croak ("DBI Exception: $_[0]");
1184           }
1185       };
1186       $dbh->{ShowErrorStatement} = 1;
1187       $dbh->{RaiseError} = 1;
1188       $dbh->{PrintError} = 0;
1189     }
1190   }
1191   catch {
1192     $self->throw_exception("DBI Connection failed: $_")
1193   }
1194   finally {
1195     $DBI::connect_via = $old_connect_via if $old_connect_via;
1196   };
1197
1198   $self->_dbh_autocommit($dbh->{AutoCommit});
1199   $dbh;
1200 }
1201
1202 sub svp_begin {
1203   my ($self, $name) = @_;
1204
1205   $name = $self->_svp_generate_name
1206     unless defined $name;
1207
1208   $self->throw_exception ("You can't use savepoints outside a transaction")
1209     if $self->{transaction_depth} == 0;
1210
1211   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1212     unless $self->can('_svp_begin');
1213
1214   push @{ $self->{savepoints} }, $name;
1215
1216   $self->debugobj->svp_begin($name) if $self->debug;
1217
1218   return $self->_svp_begin($name);
1219 }
1220
1221 sub svp_release {
1222   my ($self, $name) = @_;
1223
1224   $self->throw_exception ("You can't use savepoints outside a transaction")
1225     if $self->{transaction_depth} == 0;
1226
1227   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1228     unless $self->can('_svp_release');
1229
1230   if (defined $name) {
1231     $self->throw_exception ("Savepoint '$name' does not exist")
1232       unless grep { $_ eq $name } @{ $self->{savepoints} };
1233
1234     # Dig through the stack until we find the one we are releasing.  This keeps
1235     # the stack up to date.
1236     my $svp;
1237
1238     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1239   } else {
1240     $name = pop @{ $self->{savepoints} };
1241   }
1242
1243   $self->debugobj->svp_release($name) if $self->debug;
1244
1245   return $self->_svp_release($name);
1246 }
1247
1248 sub svp_rollback {
1249   my ($self, $name) = @_;
1250
1251   $self->throw_exception ("You can't use savepoints outside a transaction")
1252     if $self->{transaction_depth} == 0;
1253
1254   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1255     unless $self->can('_svp_rollback');
1256
1257   if (defined $name) {
1258       # If they passed us a name, verify that it exists in the stack
1259       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1260           $self->throw_exception("Savepoint '$name' does not exist!");
1261       }
1262
1263       # Dig through the stack until we find the one we are releasing.  This keeps
1264       # the stack up to date.
1265       while(my $s = pop(@{ $self->{savepoints} })) {
1266           last if($s eq $name);
1267       }
1268       # Add the savepoint back to the stack, as a rollback doesn't remove the
1269       # named savepoint, only everything after it.
1270       push(@{ $self->{savepoints} }, $name);
1271   } else {
1272       # We'll assume they want to rollback to the last savepoint
1273       $name = $self->{savepoints}->[-1];
1274   }
1275
1276   $self->debugobj->svp_rollback($name) if $self->debug;
1277
1278   return $self->_svp_rollback($name);
1279 }
1280
1281 sub _svp_generate_name {
1282     my ($self) = @_;
1283
1284     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1285 }
1286
1287 sub txn_begin {
1288   my $self = shift;
1289
1290   # this means we have not yet connected and do not know the AC status
1291   # (e.g. coderef $dbh)
1292   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1293
1294   if($self->{transaction_depth} == 0) {
1295     $self->debugobj->txn_begin()
1296       if $self->debug;
1297     $self->_dbh_begin_work;
1298   }
1299   elsif ($self->auto_savepoint) {
1300     $self->svp_begin;
1301   }
1302   $self->{transaction_depth}++;
1303 }
1304
1305 sub _dbh_begin_work {
1306   my $self = shift;
1307
1308   # if the user is utilizing txn_do - good for him, otherwise we need to
1309   # ensure that the $dbh is healthy on BEGIN.
1310   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1311   # will be replaced by a failure of begin_work itself (which will be
1312   # then retried on reconnect)
1313   if ($self->{_in_dbh_do}) {
1314     $self->_dbh->begin_work;
1315   } else {
1316     $self->dbh_do(sub { $_[1]->begin_work });
1317   }
1318 }
1319
1320 sub txn_commit {
1321   my $self = shift;
1322   if ($self->{transaction_depth} == 1) {
1323     $self->debugobj->txn_commit()
1324       if ($self->debug);
1325     $self->_dbh_commit;
1326     $self->{transaction_depth} = 0
1327       if $self->_dbh_autocommit;
1328   }
1329   elsif($self->{transaction_depth} > 1) {
1330     $self->{transaction_depth}--;
1331     $self->svp_release
1332       if $self->auto_savepoint;
1333   }
1334 }
1335
1336 sub _dbh_commit {
1337   my $self = shift;
1338   my $dbh  = $self->_dbh
1339     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1340   $dbh->commit;
1341 }
1342
1343 sub txn_rollback {
1344   my $self = shift;
1345   my $dbh = $self->_dbh;
1346   try {
1347     if ($self->{transaction_depth} == 1) {
1348       $self->debugobj->txn_rollback()
1349         if ($self->debug);
1350       $self->{transaction_depth} = 0
1351         if $self->_dbh_autocommit;
1352       $self->_dbh_rollback;
1353     }
1354     elsif($self->{transaction_depth} > 1) {
1355       $self->{transaction_depth}--;
1356       if ($self->auto_savepoint) {
1357         $self->svp_rollback;
1358         $self->svp_release;
1359       }
1360     }
1361     else {
1362       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1363     }
1364   }
1365   catch {
1366     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1367
1368     if ($_ !~ /$exception_class/) {
1369       # ensure that a failed rollback resets the transaction depth
1370       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1371     }
1372
1373     $self->throw_exception($_)
1374   };
1375 }
1376
1377 sub _dbh_rollback {
1378   my $self = shift;
1379   my $dbh  = $self->_dbh
1380     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1381   $dbh->rollback;
1382 }
1383
1384 # This used to be the top-half of _execute.  It was split out to make it
1385 #  easier to override in NoBindVars without duping the rest.  It takes up
1386 #  all of _execute's args, and emits $sql, @bind.
1387 sub _prep_for_execute {
1388   my ($self, $op, $extra_bind, $ident, $args) = @_;
1389
1390   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1391     $ident = $ident->from();
1392   }
1393
1394   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1395
1396   unshift(@bind,
1397     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1398       if $extra_bind;
1399   return ($sql, \@bind);
1400 }
1401
1402
1403 sub _fix_bind_params {
1404     my ($self, @bind) = @_;
1405
1406     ### Turn @bind from something like this:
1407     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1408     ### to this:
1409     ###   ( "'1'", "'1'", "'3'" )
1410     return
1411         map {
1412             if ( defined( $_ && $_->[1] ) ) {
1413                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1414             }
1415             else { q{'NULL'}; }
1416         } @bind;
1417 }
1418
1419 sub _query_start {
1420     my ( $self, $sql, @bind ) = @_;
1421
1422     if ( $self->debug ) {
1423         @bind = $self->_fix_bind_params(@bind);
1424
1425         $self->debugobj->query_start( $sql, @bind );
1426     }
1427 }
1428
1429 sub _query_end {
1430     my ( $self, $sql, @bind ) = @_;
1431
1432     if ( $self->debug ) {
1433         @bind = $self->_fix_bind_params(@bind);
1434         $self->debugobj->query_end( $sql, @bind );
1435     }
1436 }
1437
1438 sub _dbh_execute {
1439   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1440
1441   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1442
1443   $self->_query_start( $sql, @$bind );
1444
1445   my $sth = $self->sth($sql,$op);
1446
1447   my $placeholder_index = 1;
1448
1449   foreach my $bound (@$bind) {
1450     my $attributes = {};
1451     my($column_name, @data) = @$bound;
1452
1453     if ($bind_attributes) {
1454       $attributes = $bind_attributes->{$column_name}
1455       if defined $bind_attributes->{$column_name};
1456     }
1457
1458     foreach my $data (@data) {
1459       my $ref = ref $data;
1460       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1461
1462       $sth->bind_param($placeholder_index, $data, $attributes);
1463       $placeholder_index++;
1464     }
1465   }
1466
1467   # Can this fail without throwing an exception anyways???
1468   my $rv = $sth->execute();
1469   $self->throw_exception(
1470     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1471   ) if !$rv;
1472
1473   $self->_query_end( $sql, @$bind );
1474
1475   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1476 }
1477
1478 sub _execute {
1479     my $self = shift;
1480     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1481 }
1482
1483 sub _prefetch_insert_auto_nextvals {
1484   my ($self, $source, $to_insert) = @_;
1485
1486   my $upd = {};
1487
1488   foreach my $col ( $source->columns ) {
1489     if ( !defined $to_insert->{$col} ) {
1490       my $col_info = $source->column_info($col);
1491
1492       if ( $col_info->{auto_nextval} ) {
1493         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1494           'nextval',
1495           $col_info->{sequence} ||=
1496             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1497         );
1498       }
1499     }
1500   }
1501
1502   return $upd;
1503 }
1504
1505 sub insert {
1506   my $self = shift;
1507   my ($source, $to_insert, $opts) = @_;
1508
1509   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1510
1511   my $bind_attributes = $self->source_bind_attributes($source);
1512
1513   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1514
1515   if ($opts->{returning}) {
1516     my @ret_cols = @{$opts->{returning}};
1517
1518     my @ret_vals = try {
1519       local $SIG{__WARN__} = sub {};
1520       my @r = $sth->fetchrow_array;
1521       $sth->finish;
1522       @r;
1523     };
1524
1525     my %ret;
1526     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1527
1528     $updated_cols = {
1529       %$updated_cols,
1530       %ret,
1531     };
1532   }
1533
1534   return $updated_cols;
1535 }
1536
1537 ## Currently it is assumed that all values passed will be "normal", i.e. not
1538 ## scalar refs, or at least, all the same type as the first set, the statement is
1539 ## only prepped once.
1540 sub insert_bulk {
1541   my ($self, $source, $cols, $data) = @_;
1542
1543   my %colvalues;
1544   @colvalues{@$cols} = (0..$#$cols);
1545
1546   for my $i (0..$#$cols) {
1547     my $first_val = $data->[0][$i];
1548     next unless ref $first_val eq 'SCALAR';
1549
1550     $colvalues{ $cols->[$i] } = $first_val;
1551   }
1552
1553   # check for bad data and stringify stringifiable objects
1554   my $bad_slice = sub {
1555     my ($msg, $col_idx, $slice_idx) = @_;
1556     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1557       $msg,
1558       $cols->[$col_idx],
1559       do {
1560         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1561         Data::Dumper::Concise::Dumper({
1562           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1563         }),
1564       }
1565     );
1566   };
1567
1568   for my $datum_idx (0..$#$data) {
1569     my $datum = $data->[$datum_idx];
1570
1571     for my $col_idx (0..$#$cols) {
1572       my $val            = $datum->[$col_idx];
1573       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1574       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1575
1576       if ($is_literal_sql) {
1577         if (not ref $val) {
1578           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1579         }
1580         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1581           $bad_slice->("$reftype reference found where literal SQL expected",
1582             $col_idx, $datum_idx);
1583         }
1584         elsif ($$val ne $$sqla_bind){
1585           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1586             $col_idx, $datum_idx);
1587         }
1588       }
1589       elsif (my $reftype = ref $val) {
1590         require overload;
1591         if (overload::Method($val, '""')) {
1592           $datum->[$col_idx] = "".$val;
1593         }
1594         else {
1595           $bad_slice->("$reftype reference found where bind expected",
1596             $col_idx, $datum_idx);
1597         }
1598       }
1599     }
1600   }
1601
1602   my ($sql, $bind) = $self->_prep_for_execute (
1603     'insert', undef, $source, [\%colvalues]
1604   );
1605   my @bind = @$bind;
1606
1607   my $empty_bind = 1 if (not @bind) &&
1608     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1609
1610   if ((not @bind) && (not $empty_bind)) {
1611     $self->throw_exception(
1612       'Cannot insert_bulk without support for placeholders'
1613     );
1614   }
1615
1616   # neither _execute_array, nor _execute_inserts_with_no_binds are
1617   # atomic (even if _execute _array is a single call). Thus a safety
1618   # scope guard
1619   my $guard = $self->txn_scope_guard;
1620
1621   $self->_query_start( $sql, ['__BULK__'] );
1622   my $sth = $self->sth($sql);
1623   my $rv = do {
1624     if ($empty_bind) {
1625       # bind_param_array doesn't work if there are no binds
1626       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1627     }
1628     else {
1629 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1630       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1631     }
1632   };
1633
1634   $self->_query_end( $sql, ['__BULK__'] );
1635
1636   $guard->commit;
1637
1638   return (wantarray ? ($rv, $sth, @bind) : $rv);
1639 }
1640
1641 sub _execute_array {
1642   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1643
1644   ## This must be an arrayref, else nothing works!
1645   my $tuple_status = [];
1646
1647   ## Get the bind_attributes, if any exist
1648   my $bind_attributes = $self->source_bind_attributes($source);
1649
1650   ## Bind the values and execute
1651   my $placeholder_index = 1;
1652
1653   foreach my $bound (@$bind) {
1654
1655     my $attributes = {};
1656     my ($column_name, $data_index) = @$bound;
1657
1658     if( $bind_attributes ) {
1659       $attributes = $bind_attributes->{$column_name}
1660       if defined $bind_attributes->{$column_name};
1661     }
1662
1663     my @data = map { $_->[$data_index] } @$data;
1664
1665     $sth->bind_param_array(
1666       $placeholder_index,
1667       [@data],
1668       (%$attributes ?  $attributes : ()),
1669     );
1670     $placeholder_index++;
1671   }
1672
1673   my ($rv, $err);
1674   try {
1675     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1676   }
1677   catch {
1678     $err = shift;
1679   }
1680   finally {
1681     # Statement must finish even if there was an exception.
1682     try {
1683       $sth->finish
1684     }
1685     catch {
1686       $err = shift unless defined $err
1687     };
1688   };
1689
1690   $err = $sth->errstr
1691     if (! defined $err and $sth->err);
1692
1693   if (defined $err) {
1694     my $i = 0;
1695     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1696
1697     $self->throw_exception("Unexpected populate error: $err")
1698       if ($i > $#$tuple_status);
1699
1700     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1701       ($tuple_status->[$i][1] || $err),
1702       Data::Dumper::Concise::Dumper({
1703         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1704       }),
1705     );
1706   }
1707
1708   return $rv;
1709 }
1710
1711 sub _dbh_execute_array {
1712     my ($self, $sth, $tuple_status, @extra) = @_;
1713
1714     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1715 }
1716
1717 sub _dbh_execute_inserts_with_no_binds {
1718   my ($self, $sth, $count) = @_;
1719
1720   my $err;
1721   try {
1722     my $dbh = $self->_get_dbh;
1723     local $dbh->{RaiseError} = 1;
1724     local $dbh->{PrintError} = 0;
1725
1726     $sth->execute foreach 1..$count;
1727   }
1728   catch {
1729     $err = shift;
1730   }
1731   finally {
1732     # Make sure statement is finished even if there was an exception.
1733     try {
1734       $sth->finish
1735     }
1736     catch {
1737       $err = shift unless defined $err;
1738     };
1739   };
1740
1741   $self->throw_exception($err) if defined $err;
1742
1743   return $count;
1744 }
1745
1746 sub update {
1747   my ($self, $source, @args) = @_;
1748
1749   my $bind_attrs = $self->source_bind_attributes($source);
1750
1751   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1752 }
1753
1754
1755 sub delete {
1756   my ($self, $source, @args) = @_;
1757
1758   my $bind_attrs = $self->source_bind_attributes($source);
1759
1760   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1761 }
1762
1763 # We were sent here because the $rs contains a complex search
1764 # which will require a subquery to select the correct rows
1765 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1766 #
1767 # Generating a single PK column subquery is trivial and supported
1768 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1769 # Look at _multipk_update_delete()
1770 sub _subq_update_delete {
1771   my $self = shift;
1772   my ($rs, $op, $values) = @_;
1773
1774   my $rsrc = $rs->result_source;
1775
1776   # quick check if we got a sane rs on our hands
1777   my @pcols = $rsrc->_pri_cols;
1778
1779   my $sel = $rs->_resolved_attrs->{select};
1780   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1781
1782   if (
1783       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1784         ne
1785       join ("\x00", sort @$sel )
1786   ) {
1787     $self->throw_exception (
1788       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1789     );
1790   }
1791
1792   if (@pcols == 1) {
1793     return $self->$op (
1794       $rsrc,
1795       $op eq 'update' ? $values : (),
1796       { $pcols[0] => { -in => $rs->as_query } },
1797     );
1798   }
1799
1800   else {
1801     return $self->_multipk_update_delete (@_);
1802   }
1803 }
1804
1805 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1806 # resultset update/delete involving subqueries. So by default resort
1807 # to simple (and inefficient) delete_all style per-row opearations,
1808 # while allowing specific storages to override this with a faster
1809 # implementation.
1810 #
1811 sub _multipk_update_delete {
1812   return shift->_per_row_update_delete (@_);
1813 }
1814
1815 # This is the default loop used to delete/update rows for multi PK
1816 # resultsets, and used by mysql exclusively (because it can't do anything
1817 # else).
1818 #
1819 # We do not use $row->$op style queries, because resultset update/delete
1820 # is not expected to cascade (this is what delete_all/update_all is for).
1821 #
1822 # There should be no race conditions as the entire operation is rolled
1823 # in a transaction.
1824 #
1825 sub _per_row_update_delete {
1826   my $self = shift;
1827   my ($rs, $op, $values) = @_;
1828
1829   my $rsrc = $rs->result_source;
1830   my @pcols = $rsrc->_pri_cols;
1831
1832   my $guard = $self->txn_scope_guard;
1833
1834   # emulate the return value of $sth->execute for non-selects
1835   my $row_cnt = '0E0';
1836
1837   my $subrs_cur = $rs->cursor;
1838   my @all_pk = $subrs_cur->all;
1839   for my $pks ( @all_pk) {
1840
1841     my $cond;
1842     for my $i (0.. $#pcols) {
1843       $cond->{$pcols[$i]} = $pks->[$i];
1844     }
1845
1846     $self->$op (
1847       $rsrc,
1848       $op eq 'update' ? $values : (),
1849       $cond,
1850     );
1851
1852     $row_cnt++;
1853   }
1854
1855   $guard->commit;
1856
1857   return $row_cnt;
1858 }
1859
1860 sub _select {
1861   my $self = shift;
1862   $self->_execute($self->_select_args(@_));
1863 }
1864
1865 sub _select_args_to_query {
1866   my $self = shift;
1867
1868   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1869   #  = $self->_select_args($ident, $select, $cond, $attrs);
1870   my ($op, $bind, $ident, $bind_attrs, @args) =
1871     $self->_select_args(@_);
1872
1873   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1874   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1875   $prepared_bind ||= [];
1876
1877   return wantarray
1878     ? ($sql, $prepared_bind, $bind_attrs)
1879     : \[ "($sql)", @$prepared_bind ]
1880   ;
1881 }
1882
1883 sub _select_args {
1884   my ($self, $ident, $select, $where, $attrs) = @_;
1885
1886   my $sql_maker = $self->sql_maker;
1887   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1888
1889   $attrs = {
1890     %$attrs,
1891     select => $select,
1892     from => $ident,
1893     where => $where,
1894     $rs_alias && $alias2source->{$rs_alias}
1895       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1896       : ()
1897     ,
1898   };
1899
1900   # calculate bind_attrs before possible $ident mangling
1901   my $bind_attrs = {};
1902   for my $alias (keys %$alias2source) {
1903     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1904     for my $col (keys %$bindtypes) {
1905
1906       my $fqcn = join ('.', $alias, $col);
1907       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1908
1909       # Unqialified column names are nice, but at the same time can be
1910       # rather ambiguous. What we do here is basically go along with
1911       # the loop, adding an unqualified column slot to $bind_attrs,
1912       # alongside the fully qualified name. As soon as we encounter
1913       # another column by that name (which would imply another table)
1914       # we unset the unqualified slot and never add any info to it
1915       # to avoid erroneous type binding. If this happens the users
1916       # only choice will be to fully qualify his column name
1917
1918       if (exists $bind_attrs->{$col}) {
1919         $bind_attrs->{$col} = {};
1920       }
1921       else {
1922         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1923       }
1924     }
1925   }
1926
1927   # adjust limits
1928   if (defined $attrs->{rows}) {
1929     $self->throw_exception("rows attribute must be positive if present")
1930       unless $attrs->{rows} > 0;
1931   }
1932   elsif (defined $attrs->{offset}) {
1933     # MySQL actually recommends this approach.  I cringe.
1934     $attrs->{rows} = 2**32;
1935   }
1936
1937   my @limit;
1938
1939   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1940   # storage, unless software limit was requested
1941   if (
1942     #limited has_many
1943     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1944        ||
1945     # grouped prefetch (to satisfy group_by == select)
1946     ( $attrs->{group_by}
1947         &&
1948       @{$attrs->{group_by}}
1949         &&
1950       $attrs->{_prefetch_select}
1951         &&
1952       @{$attrs->{_prefetch_select}}
1953     )
1954   ) {
1955     ($ident, $select, $where, $attrs)
1956       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1957   }
1958   elsif (! $attrs->{software_limit} ) {
1959     push @limit, $attrs->{rows}, $attrs->{offset};
1960   }
1961
1962   # try to simplify the joinmap further (prune unreferenced type-single joins)
1963   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1964
1965 ###
1966   # This would be the point to deflate anything found in $where
1967   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1968   # expect a row object. And all we have is a resultsource (it is trivial
1969   # to extract deflator coderefs via $alias2source above).
1970   #
1971   # I don't see a way forward other than changing the way deflators are
1972   # invoked, and that's just bad...
1973 ###
1974
1975   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
1976 }
1977
1978 # Returns a counting SELECT for a simple count
1979 # query. Abstracted so that a storage could override
1980 # this to { count => 'firstcol' } or whatever makes
1981 # sense as a performance optimization
1982 sub _count_select {
1983   #my ($self, $source, $rs_attrs) = @_;
1984   return { count => '*' };
1985 }
1986
1987
1988 sub source_bind_attributes {
1989   my ($self, $source) = @_;
1990
1991   my $bind_attributes;
1992   foreach my $column ($source->columns) {
1993
1994     my $data_type = $source->column_info($column)->{data_type} || '';
1995     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1996      if $data_type;
1997   }
1998
1999   return $bind_attributes;
2000 }
2001
2002 =head2 select
2003
2004 =over 4
2005
2006 =item Arguments: $ident, $select, $condition, $attrs
2007
2008 =back
2009
2010 Handle a SQL select statement.
2011
2012 =cut
2013
2014 sub select {
2015   my $self = shift;
2016   my ($ident, $select, $condition, $attrs) = @_;
2017   return $self->cursor_class->new($self, \@_, $attrs);
2018 }
2019
2020 sub select_single {
2021   my $self = shift;
2022   my ($rv, $sth, @bind) = $self->_select(@_);
2023   my @row = $sth->fetchrow_array;
2024   my @nextrow = $sth->fetchrow_array if @row;
2025   if(@row && @nextrow) {
2026     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2027   }
2028   # Need to call finish() to work round broken DBDs
2029   $sth->finish();
2030   return @row;
2031 }
2032
2033 =head2 sth
2034
2035 =over 4
2036
2037 =item Arguments: $sql
2038
2039 =back
2040
2041 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2042
2043 =cut
2044
2045 sub _dbh_sth {
2046   my ($self, $dbh, $sql) = @_;
2047
2048   # 3 is the if_active parameter which avoids active sth re-use
2049   my $sth = $self->disable_sth_caching
2050     ? $dbh->prepare($sql)
2051     : $dbh->prepare_cached($sql, {}, 3);
2052
2053   # XXX You would think RaiseError would make this impossible,
2054   #  but apparently that's not true :(
2055   $self->throw_exception($dbh->errstr) if !$sth;
2056
2057   $sth;
2058 }
2059
2060 sub sth {
2061   my ($self, $sql) = @_;
2062   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2063 }
2064
2065 sub _dbh_columns_info_for {
2066   my ($self, $dbh, $table) = @_;
2067
2068   if ($dbh->can('column_info')) {
2069     my %result;
2070     my $caught;
2071     try {
2072       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2073       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2074       $sth->execute();
2075       while ( my $info = $sth->fetchrow_hashref() ){
2076         my %column_info;
2077         $column_info{data_type}   = $info->{TYPE_NAME};
2078         $column_info{size}      = $info->{COLUMN_SIZE};
2079         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2080         $column_info{default_value} = $info->{COLUMN_DEF};
2081         my $col_name = $info->{COLUMN_NAME};
2082         $col_name =~ s/^\"(.*)\"$/$1/;
2083
2084         $result{$col_name} = \%column_info;
2085       }
2086     } catch {
2087       $caught = 1;
2088     };
2089     return \%result if !$caught && scalar keys %result;
2090   }
2091
2092   my %result;
2093   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2094   $sth->execute;
2095   my @columns = @{$sth->{NAME_lc}};
2096   for my $i ( 0 .. $#columns ){
2097     my %column_info;
2098     $column_info{data_type} = $sth->{TYPE}->[$i];
2099     $column_info{size} = $sth->{PRECISION}->[$i];
2100     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2101
2102     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2103       $column_info{data_type} = $1;
2104       $column_info{size}    = $2;
2105     }
2106
2107     $result{$columns[$i]} = \%column_info;
2108   }
2109   $sth->finish;
2110
2111   foreach my $col (keys %result) {
2112     my $colinfo = $result{$col};
2113     my $type_num = $colinfo->{data_type};
2114     my $type_name;
2115     if(defined $type_num && $dbh->can('type_info')) {
2116       my $type_info = $dbh->type_info($type_num);
2117       $type_name = $type_info->{TYPE_NAME} if $type_info;
2118       $colinfo->{data_type} = $type_name if $type_name;
2119     }
2120   }
2121
2122   return \%result;
2123 }
2124
2125 sub columns_info_for {
2126   my ($self, $table) = @_;
2127   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2128 }
2129
2130 =head2 last_insert_id
2131
2132 Return the row id of the last insert.
2133
2134 =cut
2135
2136 sub _dbh_last_insert_id {
2137     my ($self, $dbh, $source, $col) = @_;
2138
2139     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2140
2141     return $id if defined $id;
2142
2143     my $class = ref $self;
2144     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2145 }
2146
2147 sub last_insert_id {
2148   my $self = shift;
2149   $self->_dbh_last_insert_id ($self->_dbh, @_);
2150 }
2151
2152 =head2 _native_data_type
2153
2154 =over 4
2155
2156 =item Arguments: $type_name
2157
2158 =back
2159
2160 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2161 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2162 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2163
2164 The default implementation returns C<undef>, implement in your Storage driver if
2165 you need this functionality.
2166
2167 Should map types from other databases to the native RDBMS type, for example
2168 C<VARCHAR2> to C<VARCHAR>.
2169
2170 Types with modifiers should map to the underlying data type. For example,
2171 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2172
2173 Composite types should map to the container type, for example
2174 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2175
2176 =cut
2177
2178 sub _native_data_type {
2179   #my ($self, $data_type) = @_;
2180   return undef
2181 }
2182
2183 # Check if placeholders are supported at all
2184 sub _placeholders_supported {
2185   my $self = shift;
2186   my $dbh  = $self->_get_dbh;
2187
2188   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2189   # but it is inaccurate more often than not
2190   return try {
2191     local $dbh->{PrintError} = 0;
2192     local $dbh->{RaiseError} = 1;
2193     $dbh->do('select ?', {}, 1);
2194     1;
2195   }
2196   catch {
2197     0;
2198   };
2199 }
2200
2201 # Check if placeholders bound to non-string types throw exceptions
2202 #
2203 sub _typeless_placeholders_supported {
2204   my $self = shift;
2205   my $dbh  = $self->_get_dbh;
2206
2207   return try {
2208     local $dbh->{PrintError} = 0;
2209     local $dbh->{RaiseError} = 1;
2210     # this specifically tests a bind that is NOT a string
2211     $dbh->do('select 1 where 1 = ?', {}, 1);
2212     1;
2213   }
2214   catch {
2215     0;
2216   };
2217 }
2218
2219 =head2 sqlt_type
2220
2221 Returns the database driver name.
2222
2223 =cut
2224
2225 sub sqlt_type {
2226   shift->_get_dbh->{Driver}->{Name};
2227 }
2228
2229 =head2 bind_attribute_by_data_type
2230
2231 Given a datatype from column info, returns a database specific bind
2232 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2233 let the database planner just handle it.
2234
2235 Generally only needed for special case column types, like bytea in postgres.
2236
2237 =cut
2238
2239 sub bind_attribute_by_data_type {
2240     return;
2241 }
2242
2243 =head2 is_datatype_numeric
2244
2245 Given a datatype from column_info, returns a boolean value indicating if
2246 the current RDBMS considers it a numeric value. This controls how
2247 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2248 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2249 be performed instead of the usual C<eq>.
2250
2251 =cut
2252
2253 sub is_datatype_numeric {
2254   my ($self, $dt) = @_;
2255
2256   return 0 unless $dt;
2257
2258   return $dt =~ /^ (?:
2259     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2260   ) $/ix;
2261 }
2262
2263
2264 =head2 create_ddl_dir
2265
2266 =over 4
2267
2268 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2269
2270 =back
2271
2272 Creates a SQL file based on the Schema, for each of the specified
2273 database engines in C<\@databases> in the given directory.
2274 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2275
2276 Given a previous version number, this will also create a file containing
2277 the ALTER TABLE statements to transform the previous schema into the
2278 current one. Note that these statements may contain C<DROP TABLE> or
2279 C<DROP COLUMN> statements that can potentially destroy data.
2280
2281 The file names are created using the C<ddl_filename> method below, please
2282 override this method in your schema if you would like a different file
2283 name format. For the ALTER file, the same format is used, replacing
2284 $version in the name with "$preversion-$version".
2285
2286 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2287 The most common value for this would be C<< { add_drop_table => 1 } >>
2288 to have the SQL produced include a C<DROP TABLE> statement for each table
2289 created. For quoting purposes supply C<quote_table_names> and
2290 C<quote_field_names>.
2291
2292 If no arguments are passed, then the following default values are assumed:
2293
2294 =over 4
2295
2296 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2297
2298 =item version    - $schema->schema_version
2299
2300 =item directory  - './'
2301
2302 =item preversion - <none>
2303
2304 =back
2305
2306 By default, C<\%sqlt_args> will have
2307
2308  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2309
2310 merged with the hash passed in. To disable any of those features, pass in a
2311 hashref like the following
2312
2313  { ignore_constraint_names => 0, # ... other options }
2314
2315
2316 WARNING: You are strongly advised to check all SQL files created, before applying
2317 them.
2318
2319 =cut
2320
2321 sub create_ddl_dir {
2322   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2323
2324   unless ($dir) {
2325     carp "No directory given, using ./\n";
2326     $dir = './';
2327   } else {
2328       -d $dir or File::Path::mkpath($dir)
2329           or $self->throw_exception("create_ddl_dir: $! creating dir '$dir'");
2330   }
2331
2332   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2333
2334   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2335   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2336
2337   my $schema_version = $schema->schema_version || '1.x';
2338   $version ||= $schema_version;
2339
2340   $sqltargs = {
2341     add_drop_table => 1,
2342     ignore_constraint_names => 1,
2343     ignore_index_names => 1,
2344     %{$sqltargs || {}}
2345   };
2346
2347   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2348     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2349   }
2350
2351   my $sqlt = SQL::Translator->new( $sqltargs );
2352
2353   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2354   my $sqlt_schema = $sqlt->translate({ data => $schema })
2355     or $self->throw_exception ($sqlt->error);
2356
2357   foreach my $db (@$databases) {
2358     $sqlt->reset();
2359     $sqlt->{schema} = $sqlt_schema;
2360     $sqlt->producer($db);
2361
2362     my $file;
2363     my $filename = $schema->ddl_filename($db, $version, $dir);
2364     if (-e $filename && ($version eq $schema_version )) {
2365       # if we are dumping the current version, overwrite the DDL
2366       carp "Overwriting existing DDL file - $filename";
2367       unlink($filename);
2368     }
2369
2370     my $output = $sqlt->translate;
2371     if(!$output) {
2372       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2373       next;
2374     }
2375     if(!open($file, ">$filename")) {
2376       $self->throw_exception("Can't open $filename for writing ($!)");
2377       next;
2378     }
2379     print $file $output;
2380     close($file);
2381
2382     next unless ($preversion);
2383
2384     require SQL::Translator::Diff;
2385
2386     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2387     if(!-e $prefilename) {
2388       carp("No previous schema file found ($prefilename)");
2389       next;
2390     }
2391
2392     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2393     if(-e $difffile) {
2394       carp("Overwriting existing diff file - $difffile");
2395       unlink($difffile);
2396     }
2397
2398     my $source_schema;
2399     {
2400       my $t = SQL::Translator->new($sqltargs);
2401       $t->debug( 0 );
2402       $t->trace( 0 );
2403
2404       $t->parser( $db )
2405         or $self->throw_exception ($t->error);
2406
2407       my $out = $t->translate( $prefilename )
2408         or $self->throw_exception ($t->error);
2409
2410       $source_schema = $t->schema;
2411
2412       $source_schema->name( $prefilename )
2413         unless ( $source_schema->name );
2414     }
2415
2416     # The "new" style of producers have sane normalization and can support
2417     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2418     # And we have to diff parsed SQL against parsed SQL.
2419     my $dest_schema = $sqlt_schema;
2420
2421     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2422       my $t = SQL::Translator->new($sqltargs);
2423       $t->debug( 0 );
2424       $t->trace( 0 );
2425
2426       $t->parser( $db )
2427         or $self->throw_exception ($t->error);
2428
2429       my $out = $t->translate( $filename )
2430         or $self->throw_exception ($t->error);
2431
2432       $dest_schema = $t->schema;
2433
2434       $dest_schema->name( $filename )
2435         unless $dest_schema->name;
2436     }
2437
2438     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2439                                                   $dest_schema,   $db,
2440                                                   $sqltargs
2441                                                  );
2442     if(!open $file, ">$difffile") {
2443       $self->throw_exception("Can't write to $difffile ($!)");
2444       next;
2445     }
2446     print $file $diff;
2447     close($file);
2448   }
2449 }
2450
2451 =head2 deployment_statements
2452
2453 =over 4
2454
2455 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2456
2457 =back
2458
2459 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2460
2461 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2462 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2463
2464 C<$directory> is used to return statements from files in a previously created
2465 L</create_ddl_dir> directory and is optional. The filenames are constructed
2466 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2467
2468 If no C<$directory> is specified then the statements are constructed on the
2469 fly using L<SQL::Translator> and C<$version> is ignored.
2470
2471 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2472
2473 =cut
2474
2475 sub deployment_statements {
2476   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2477   $type ||= $self->sqlt_type;
2478   $version ||= $schema->schema_version || '1.x';
2479   $dir ||= './';
2480   my $filename = $schema->ddl_filename($type, $version, $dir);
2481   if(-f $filename)
2482   {
2483       my $file;
2484       open($file, "<$filename")
2485         or $self->throw_exception("Can't open $filename ($!)");
2486       my @rows = <$file>;
2487       close($file);
2488       return join('', @rows);
2489   }
2490
2491   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2492     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2493   }
2494
2495   # sources needs to be a parser arg, but for simplicty allow at top level
2496   # coming in
2497   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2498       if exists $sqltargs->{sources};
2499
2500   my $tr = SQL::Translator->new(
2501     producer => "SQL::Translator::Producer::${type}",
2502     %$sqltargs,
2503     parser => 'SQL::Translator::Parser::DBIx::Class',
2504     data => $schema,
2505   );
2506
2507   my @ret;
2508   my $wa = wantarray;
2509   if ($wa) {
2510     @ret = $tr->translate;
2511   }
2512   else {
2513     $ret[0] = $tr->translate;
2514   }
2515
2516   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2517     unless (@ret && defined $ret[0]);
2518
2519   return $wa ? @ret : $ret[0];
2520 }
2521
2522 sub deploy {
2523   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2524   my $deploy = sub {
2525     my $line = shift;
2526     return if($line =~ /^--/);
2527     return if(!$line);
2528     # next if($line =~ /^DROP/m);
2529     return if($line =~ /^BEGIN TRANSACTION/m);
2530     return if($line =~ /^COMMIT/m);
2531     return if $line =~ /^\s+$/; # skip whitespace only
2532     $self->_query_start($line);
2533     try {
2534       # do a dbh_do cycle here, as we need some error checking in
2535       # place (even though we will ignore errors)
2536       $self->dbh_do (sub { $_[1]->do($line) });
2537     } catch {
2538       carp qq{$_ (running "${line}")};
2539     };
2540     $self->_query_end($line);
2541   };
2542   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2543   if (@statements > 1) {
2544     foreach my $statement (@statements) {
2545       $deploy->( $statement );
2546     }
2547   }
2548   elsif (@statements == 1) {
2549     foreach my $line ( split(";\n", $statements[0])) {
2550       $deploy->( $line );
2551     }
2552   }
2553 }
2554
2555 =head2 datetime_parser
2556
2557 Returns the datetime parser class
2558
2559 =cut
2560
2561 sub datetime_parser {
2562   my $self = shift;
2563   return $self->{datetime_parser} ||= do {
2564     $self->build_datetime_parser(@_);
2565   };
2566 }
2567
2568 =head2 datetime_parser_type
2569
2570 Defines (returns) the datetime parser class - currently hardwired to
2571 L<DateTime::Format::MySQL>
2572
2573 =cut
2574
2575 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2576
2577 =head2 build_datetime_parser
2578
2579 See L</datetime_parser>
2580
2581 =cut
2582
2583 sub build_datetime_parser {
2584   my $self = shift;
2585   my $type = $self->datetime_parser_type(@_);
2586   $self->ensure_class_loaded ($type);
2587   return $type;
2588 }
2589
2590
2591 =head2 is_replicating
2592
2593 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2594 replicate from a master database.  Default is undef, which is the result
2595 returned by databases that don't support replication.
2596
2597 =cut
2598
2599 sub is_replicating {
2600     return;
2601
2602 }
2603
2604 =head2 lag_behind_master
2605
2606 Returns a number that represents a certain amount of lag behind a master db
2607 when a given storage is replicating.  The number is database dependent, but
2608 starts at zero and increases with the amount of lag. Default in undef
2609
2610 =cut
2611
2612 sub lag_behind_master {
2613     return;
2614 }
2615
2616 =head2 relname_to_table_alias
2617
2618 =over 4
2619
2620 =item Arguments: $relname, $join_count
2621
2622 =back
2623
2624 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2625 queries.
2626
2627 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2628 way these aliases are named.
2629
2630 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2631 otherwise C<"$relname">.
2632
2633 =cut
2634
2635 sub relname_to_table_alias {
2636   my ($self, $relname, $join_count) = @_;
2637
2638   my $alias = ($join_count && $join_count > 1 ?
2639     join('_', $relname, $join_count) : $relname);
2640
2641   return $alias;
2642 }
2643
2644 1;
2645
2646 =head1 USAGE NOTES
2647
2648 =head2 DBIx::Class and AutoCommit
2649
2650 DBIx::Class can do some wonderful magic with handling exceptions,
2651 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2652 (the default) combined with C<txn_do> for transaction support.
2653
2654 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2655 in an assumed transaction between commits, and you're telling us you'd
2656 like to manage that manually.  A lot of the magic protections offered by
2657 this module will go away.  We can't protect you from exceptions due to database
2658 disconnects because we don't know anything about how to restart your
2659 transactions.  You're on your own for handling all sorts of exceptional
2660 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2661 be with raw DBI.
2662
2663
2664 =head1 AUTHORS
2665
2666 Matt S. Trout <mst@shadowcatsystems.co.uk>
2667
2668 Andy Grundman <andy@hybridized.org>
2669
2670 =head1 LICENSE
2671
2672 You may distribute this code under the same terms as Perl itself.
2673
2674 =cut