add sql_maker to @rdbms_specific_methods
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18
19 use File::Path ();
20
21 __PACKAGE__->mk_group_accessors('simple' =>
22   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
23      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints
24      _server_info_hash/
25 );
26
27 # the values for these accessors are picked out (and deleted) from
28 # the attribute hashref passed to connect_info
29 my @storage_options = qw/
30   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
31   disable_sth_caching unsafe auto_savepoint
32 /;
33 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
34
35
36 # default cursor class, overridable in connect_info attributes
37 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
38
39 __PACKAGE__->mk_group_accessors('inherited' => qw/
40   sql_maker_class
41   _supports_insert_returning
42 /);
43 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
44
45 # Each of these methods need _determine_driver called before itself
46 # in order to function reliably. This is a purely DRY optimization
47 my @rdbms_specific_methods = qw/
48   deployment_statements
49   sqlt_type
50   sql_maker
51   build_datetime_parser
52   datetime_parser_type
53
54   insert
55   insert_bulk
56   update
57   delete
58   select
59   select_single
60 /;
61
62 for my $meth (@rdbms_specific_methods) {
63
64   my $orig = __PACKAGE__->can ($meth)
65     or next;
66
67   no strict qw/refs/;
68   no warnings qw/redefine/;
69   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
70     if (not $_[0]->_driver_determined) {
71       $_[0]->_determine_driver;
72       goto $_[0]->can($meth);
73     }
74     $orig->(@_);
75   };
76 }
77
78
79 =head1 NAME
80
81 DBIx::Class::Storage::DBI - DBI storage handler
82
83 =head1 SYNOPSIS
84
85   my $schema = MySchema->connect('dbi:SQLite:my.db');
86
87   $schema->storage->debug(1);
88
89   my @stuff = $schema->storage->dbh_do(
90     sub {
91       my ($storage, $dbh, @args) = @_;
92       $dbh->do("DROP TABLE authors");
93     },
94     @column_list
95   );
96
97   $schema->resultset('Book')->search({
98      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
99   });
100
101 =head1 DESCRIPTION
102
103 This class represents the connection to an RDBMS via L<DBI>.  See
104 L<DBIx::Class::Storage> for general information.  This pod only
105 documents DBI-specific methods and behaviors.
106
107 =head1 METHODS
108
109 =cut
110
111 sub new {
112   my $new = shift->next::method(@_);
113
114   $new->transaction_depth(0);
115   $new->_sql_maker_opts({});
116   $new->{savepoints} = [];
117   $new->{_in_dbh_do} = 0;
118   $new->{_dbh_gen} = 0;
119
120   # read below to see what this does
121   $new->_arm_global_destructor;
122
123   $new;
124 }
125
126 # This is hack to work around perl shooting stuff in random
127 # order on exit(). If we do not walk the remaining storage
128 # objects in an END block, there is a *small but real* chance
129 # of a fork()ed child to kill the parent's shared DBI handle,
130 # *before perl reaches the DESTROY in this package*
131 # Yes, it is ugly and effective.
132 {
133   my %seek_and_destroy;
134
135   sub _arm_global_destructor {
136     my $self = shift;
137     my $key = Scalar::Util::refaddr ($self);
138     $seek_and_destroy{$key} = $self;
139     Scalar::Util::weaken ($seek_and_destroy{$key});
140   }
141
142   END {
143     local $?; # just in case the DBI destructor changes it somehow
144
145     # destroy just the object if not native to this process/thread
146     $_->_preserve_foreign_dbh for (grep
147       { defined $_ }
148       values %seek_and_destroy
149     );
150   }
151 }
152
153 sub DESTROY {
154   my $self = shift;
155
156   # destroy just the object if not native to this process/thread
157   $self->_preserve_foreign_dbh;
158
159   # some databases need this to stop spewing warnings
160   if (my $dbh = $self->_dbh) {
161     local $@;
162     eval {
163       %{ $dbh->{CachedKids} } = ();
164       $dbh->disconnect;
165     };
166   }
167
168   $self->_dbh(undef);
169 }
170
171 sub _preserve_foreign_dbh {
172   my $self = shift;
173
174   return unless $self->_dbh;
175
176   $self->_verify_tid;
177
178   return unless $self->_dbh;
179
180   $self->_verify_pid;
181
182 }
183
184 # handle pid changes correctly - do not destroy parent's connection
185 sub _verify_pid {
186   my $self = shift;
187
188   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
189
190   $self->_dbh->{InactiveDestroy} = 1;
191   $self->_dbh(undef);
192   $self->{_dbh_gen}++;
193
194   return;
195 }
196
197 # very similar to above, but seems to FAIL if I set InactiveDestroy
198 sub _verify_tid {
199   my $self = shift;
200
201   if ( ! defined $self->_conn_tid ) {
202     return; # no threads
203   }
204   elsif ( $self->_conn_tid == threads->tid ) {
205     return; # same thread
206   }
207
208   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
209   $self->_dbh(undef);
210   $self->{_dbh_gen}++;
211
212   return;
213 }
214
215
216 =head2 connect_info
217
218 This method is normally called by L<DBIx::Class::Schema/connection>, which
219 encapsulates its argument list in an arrayref before passing them here.
220
221 The argument list may contain:
222
223 =over
224
225 =item *
226
227 The same 4-element argument set one would normally pass to
228 L<DBI/connect>, optionally followed by
229 L<extra attributes|/DBIx::Class specific connection attributes>
230 recognized by DBIx::Class:
231
232   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
233
234 =item *
235
236 A single code reference which returns a connected
237 L<DBI database handle|DBI/connect> optionally followed by
238 L<extra attributes|/DBIx::Class specific connection attributes> recognized
239 by DBIx::Class:
240
241   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
242
243 =item *
244
245 A single hashref with all the attributes and the dsn/user/password
246 mixed together:
247
248   $connect_info_args = [{
249     dsn => $dsn,
250     user => $user,
251     password => $pass,
252     %dbi_attributes,
253     %extra_attributes,
254   }];
255
256   $connect_info_args = [{
257     dbh_maker => sub { DBI->connect (...) },
258     %dbi_attributes,
259     %extra_attributes,
260   }];
261
262 This is particularly useful for L<Catalyst> based applications, allowing the
263 following config (L<Config::General> style):
264
265   <Model::DB>
266     schema_class   App::DB
267     <connect_info>
268       dsn          dbi:mysql:database=test
269       user         testuser
270       password     TestPass
271       AutoCommit   1
272     </connect_info>
273   </Model::DB>
274
275 The C<dsn>/C<user>/C<password> combination can be substituted by the
276 C<dbh_maker> key whose value is a coderef that returns a connected
277 L<DBI database handle|DBI/connect>
278
279 =back
280
281 Please note that the L<DBI> docs recommend that you always explicitly
282 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
283 recommends that it be set to I<1>, and that you perform transactions
284 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
285 to I<1> if you do not do explicitly set it to zero.  This is the default
286 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
287
288 =head3 DBIx::Class specific connection attributes
289
290 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
291 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
292 the following connection options. These options can be mixed in with your other
293 L<DBI> connection attributes, or placed in a separate hashref
294 (C<\%extra_attributes>) as shown above.
295
296 Every time C<connect_info> is invoked, any previous settings for
297 these options will be cleared before setting the new ones, regardless of
298 whether any options are specified in the new C<connect_info>.
299
300
301 =over
302
303 =item on_connect_do
304
305 Specifies things to do immediately after connecting or re-connecting to
306 the database.  Its value may contain:
307
308 =over
309
310 =item a scalar
311
312 This contains one SQL statement to execute.
313
314 =item an array reference
315
316 This contains SQL statements to execute in order.  Each element contains
317 a string or a code reference that returns a string.
318
319 =item a code reference
320
321 This contains some code to execute.  Unlike code references within an
322 array reference, its return value is ignored.
323
324 =back
325
326 =item on_disconnect_do
327
328 Takes arguments in the same form as L</on_connect_do> and executes them
329 immediately before disconnecting from the database.
330
331 Note, this only runs if you explicitly call L</disconnect> on the
332 storage object.
333
334 =item on_connect_call
335
336 A more generalized form of L</on_connect_do> that calls the specified
337 C<connect_call_METHOD> methods in your storage driver.
338
339   on_connect_do => 'select 1'
340
341 is equivalent to:
342
343   on_connect_call => [ [ do_sql => 'select 1' ] ]
344
345 Its values may contain:
346
347 =over
348
349 =item a scalar
350
351 Will call the C<connect_call_METHOD> method.
352
353 =item a code reference
354
355 Will execute C<< $code->($storage) >>
356
357 =item an array reference
358
359 Each value can be a method name or code reference.
360
361 =item an array of arrays
362
363 For each array, the first item is taken to be the C<connect_call_> method name
364 or code reference, and the rest are parameters to it.
365
366 =back
367
368 Some predefined storage methods you may use:
369
370 =over
371
372 =item do_sql
373
374 Executes a SQL string or a code reference that returns a SQL string. This is
375 what L</on_connect_do> and L</on_disconnect_do> use.
376
377 It can take:
378
379 =over
380
381 =item a scalar
382
383 Will execute the scalar as SQL.
384
385 =item an arrayref
386
387 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
388 attributes hashref and bind values.
389
390 =item a code reference
391
392 Will execute C<< $code->($storage) >> and execute the return array refs as
393 above.
394
395 =back
396
397 =item datetime_setup
398
399 Execute any statements necessary to initialize the database session to return
400 and accept datetime/timestamp values used with
401 L<DBIx::Class::InflateColumn::DateTime>.
402
403 Only necessary for some databases, see your specific storage driver for
404 implementation details.
405
406 =back
407
408 =item on_disconnect_call
409
410 Takes arguments in the same form as L</on_connect_call> and executes them
411 immediately before disconnecting from the database.
412
413 Calls the C<disconnect_call_METHOD> methods as opposed to the
414 C<connect_call_METHOD> methods called by L</on_connect_call>.
415
416 Note, this only runs if you explicitly call L</disconnect> on the
417 storage object.
418
419 =item disable_sth_caching
420
421 If set to a true value, this option will disable the caching of
422 statement handles via L<DBI/prepare_cached>.
423
424 =item limit_dialect
425
426 Sets the limit dialect. This is useful for JDBC-bridge among others
427 where the remote SQL-dialect cannot be determined by the name of the
428 driver alone. See also L<SQL::Abstract::Limit>.
429
430 =item quote_char
431
432 Specifies what characters to use to quote table and column names. If
433 you use this you will want to specify L</name_sep> as well.
434
435 C<quote_char> expects either a single character, in which case is it
436 is placed on either side of the table/column name, or an arrayref of length
437 2 in which case the table/column name is placed between the elements.
438
439 For example under MySQL you should use C<< quote_char => '`' >>, and for
440 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
441
442 =item name_sep
443
444 This only needs to be used in conjunction with C<quote_char>, and is used to
445 specify the character that separates elements (schemas, tables, columns) from
446 each other. In most cases this is simply a C<.>.
447
448 The consequences of not supplying this value is that L<SQL::Abstract>
449 will assume DBIx::Class' uses of aliases to be complete column
450 names. The output will look like I<"me.name"> when it should actually
451 be I<"me"."name">.
452
453 =item unsafe
454
455 This Storage driver normally installs its own C<HandleError>, sets
456 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
457 all database handles, including those supplied by a coderef.  It does this
458 so that it can have consistent and useful error behavior.
459
460 If you set this option to a true value, Storage will not do its usual
461 modifications to the database handle's attributes, and instead relies on
462 the settings in your connect_info DBI options (or the values you set in
463 your connection coderef, in the case that you are connecting via coderef).
464
465 Note that your custom settings can cause Storage to malfunction,
466 especially if you set a C<HandleError> handler that suppresses exceptions
467 and/or disable C<RaiseError>.
468
469 =item auto_savepoint
470
471 If this option is true, L<DBIx::Class> will use savepoints when nesting
472 transactions, making it possible to recover from failure in the inner
473 transaction without having to abort all outer transactions.
474
475 =item cursor_class
476
477 Use this argument to supply a cursor class other than the default
478 L<DBIx::Class::Storage::DBI::Cursor>.
479
480 =back
481
482 Some real-life examples of arguments to L</connect_info> and
483 L<DBIx::Class::Schema/connect>
484
485   # Simple SQLite connection
486   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
487
488   # Connect via subref
489   ->connect_info([ sub { DBI->connect(...) } ]);
490
491   # Connect via subref in hashref
492   ->connect_info([{
493     dbh_maker => sub { DBI->connect(...) },
494     on_connect_do => 'alter session ...',
495   }]);
496
497   # A bit more complicated
498   ->connect_info(
499     [
500       'dbi:Pg:dbname=foo',
501       'postgres',
502       'my_pg_password',
503       { AutoCommit => 1 },
504       { quote_char => q{"}, name_sep => q{.} },
505     ]
506   );
507
508   # Equivalent to the previous example
509   ->connect_info(
510     [
511       'dbi:Pg:dbname=foo',
512       'postgres',
513       'my_pg_password',
514       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
515     ]
516   );
517
518   # Same, but with hashref as argument
519   # See parse_connect_info for explanation
520   ->connect_info(
521     [{
522       dsn         => 'dbi:Pg:dbname=foo',
523       user        => 'postgres',
524       password    => 'my_pg_password',
525       AutoCommit  => 1,
526       quote_char  => q{"},
527       name_sep    => q{.},
528     }]
529   );
530
531   # Subref + DBIx::Class-specific connection options
532   ->connect_info(
533     [
534       sub { DBI->connect(...) },
535       {
536           quote_char => q{`},
537           name_sep => q{@},
538           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
539           disable_sth_caching => 1,
540       },
541     ]
542   );
543
544
545
546 =cut
547
548 sub connect_info {
549   my ($self, $info) = @_;
550
551   return $self->_connect_info if !$info;
552
553   $self->_connect_info($info); # copy for _connect_info
554
555   $info = $self->_normalize_connect_info($info)
556     if ref $info eq 'ARRAY';
557
558   for my $storage_opt (keys %{ $info->{storage_options} }) {
559     my $value = $info->{storage_options}{$storage_opt};
560
561     $self->$storage_opt($value);
562   }
563
564   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
565   #  the new set of options
566   $self->_sql_maker(undef);
567   $self->_sql_maker_opts({});
568
569   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
570     my $value = $info->{sql_maker_options}{$sql_maker_opt};
571
572     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
573   }
574
575   my %attrs = (
576     %{ $self->_default_dbi_connect_attributes || {} },
577     %{ $info->{attributes} || {} },
578   );
579
580   my @args = @{ $info->{arguments} };
581
582   $self->_dbi_connect_info([@args,
583     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
584
585   return $self->_connect_info;
586 }
587
588 sub _normalize_connect_info {
589   my ($self, $info_arg) = @_;
590   my %info;
591
592   my @args = @$info_arg;  # take a shallow copy for further mutilation
593
594   # combine/pre-parse arguments depending on invocation style
595
596   my %attrs;
597   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
598     %attrs = %{ $args[1] || {} };
599     @args = $args[0];
600   }
601   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
602     %attrs = %{$args[0]};
603     @args = ();
604     if (my $code = delete $attrs{dbh_maker}) {
605       @args = $code;
606
607       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
608       if (@ignored) {
609         carp sprintf (
610             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
611           . "to the result of 'dbh_maker'",
612
613           join (', ', map { "'$_'" } (@ignored) ),
614         );
615       }
616     }
617     else {
618       @args = delete @attrs{qw/dsn user password/};
619     }
620   }
621   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
622     %attrs = (
623       % { $args[3] || {} },
624       % { $args[4] || {} },
625     );
626     @args = @args[0,1,2];
627   }
628
629   $info{arguments} = \@args;
630
631   my @storage_opts = grep exists $attrs{$_},
632     @storage_options, 'cursor_class';
633
634   @{ $info{storage_options} }{@storage_opts} =
635     delete @attrs{@storage_opts} if @storage_opts;
636
637   my @sql_maker_opts = grep exists $attrs{$_},
638     qw/limit_dialect quote_char name_sep/;
639
640   @{ $info{sql_maker_options} }{@sql_maker_opts} =
641     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
642
643   $info{attributes} = \%attrs if %attrs;
644
645   return \%info;
646 }
647
648 sub _default_dbi_connect_attributes {
649   return {
650     AutoCommit => 1,
651     RaiseError => 1,
652     PrintError => 0,
653   };
654 }
655
656 =head2 on_connect_do
657
658 This method is deprecated in favour of setting via L</connect_info>.
659
660 =cut
661
662 =head2 on_disconnect_do
663
664 This method is deprecated in favour of setting via L</connect_info>.
665
666 =cut
667
668 sub _parse_connect_do {
669   my ($self, $type) = @_;
670
671   my $val = $self->$type;
672   return () if not defined $val;
673
674   my @res;
675
676   if (not ref($val)) {
677     push @res, [ 'do_sql', $val ];
678   } elsif (ref($val) eq 'CODE') {
679     push @res, $val;
680   } elsif (ref($val) eq 'ARRAY') {
681     push @res, map { [ 'do_sql', $_ ] } @$val;
682   } else {
683     $self->throw_exception("Invalid type for $type: ".ref($val));
684   }
685
686   return \@res;
687 }
688
689 =head2 dbh_do
690
691 Arguments: ($subref | $method_name), @extra_coderef_args?
692
693 Execute the given $subref or $method_name using the new exception-based
694 connection management.
695
696 The first two arguments will be the storage object that C<dbh_do> was called
697 on and a database handle to use.  Any additional arguments will be passed
698 verbatim to the called subref as arguments 2 and onwards.
699
700 Using this (instead of $self->_dbh or $self->dbh) ensures correct
701 exception handling and reconnection (or failover in future subclasses).
702
703 Your subref should have no side-effects outside of the database, as
704 there is the potential for your subref to be partially double-executed
705 if the database connection was stale/dysfunctional.
706
707 Example:
708
709   my @stuff = $schema->storage->dbh_do(
710     sub {
711       my ($storage, $dbh, @cols) = @_;
712       my $cols = join(q{, }, @cols);
713       $dbh->selectrow_array("SELECT $cols FROM foo");
714     },
715     @column_list
716   );
717
718 =cut
719
720 sub dbh_do {
721   my $self = shift;
722   my $code = shift;
723
724   my $dbh = $self->_get_dbh;
725
726   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
727       || $self->{transaction_depth};
728
729   local $self->{_in_dbh_do} = 1;
730
731   my @result;
732   my $want_array = wantarray;
733
734   eval {
735
736     if($want_array) {
737         @result = $self->$code($dbh, @_);
738     }
739     elsif(defined $want_array) {
740         $result[0] = $self->$code($dbh, @_);
741     }
742     else {
743         $self->$code($dbh, @_);
744     }
745   };
746
747   # ->connected might unset $@ - copy
748   my $exception = $@;
749   if(!$exception) { return $want_array ? @result : $result[0] }
750
751   $self->throw_exception($exception) if $self->connected;
752
753   # We were not connected - reconnect and retry, but let any
754   #  exception fall right through this time
755   carp "Retrying $code after catching disconnected exception: $exception"
756     if $ENV{DBIC_DBIRETRY_DEBUG};
757   $self->_populate_dbh;
758   $self->$code($self->_dbh, @_);
759 }
760
761 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
762 # It also informs dbh_do to bypass itself while under the direction of txn_do,
763 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
764 sub txn_do {
765   my $self = shift;
766   my $coderef = shift;
767
768   ref $coderef eq 'CODE' or $self->throw_exception
769     ('$coderef must be a CODE reference');
770
771   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
772
773   local $self->{_in_dbh_do} = 1;
774
775   my @result;
776   my $want_array = wantarray;
777
778   my $tried = 0;
779   while(1) {
780     eval {
781       $self->_get_dbh;
782
783       $self->txn_begin;
784       if($want_array) {
785           @result = $coderef->(@_);
786       }
787       elsif(defined $want_array) {
788           $result[0] = $coderef->(@_);
789       }
790       else {
791           $coderef->(@_);
792       }
793       $self->txn_commit;
794     };
795
796     # ->connected might unset $@ - copy
797     my $exception = $@;
798     if(!$exception) { return $want_array ? @result : $result[0] }
799
800     if($tried++ || $self->connected) {
801       eval { $self->txn_rollback };
802       my $rollback_exception = $@;
803       if($rollback_exception) {
804         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
805         $self->throw_exception($exception)  # propagate nested rollback
806           if $rollback_exception =~ /$exception_class/;
807
808         $self->throw_exception(
809           "Transaction aborted: ${exception}. "
810           . "Rollback failed: ${rollback_exception}"
811         );
812       }
813       $self->throw_exception($exception)
814     }
815
816     # We were not connected, and was first try - reconnect and retry
817     # via the while loop
818     carp "Retrying $coderef after catching disconnected exception: $exception"
819       if $ENV{DBIC_DBIRETRY_DEBUG};
820     $self->_populate_dbh;
821   }
822 }
823
824 =head2 disconnect
825
826 Our C<disconnect> method also performs a rollback first if the
827 database is not in C<AutoCommit> mode.
828
829 =cut
830
831 sub disconnect {
832   my ($self) = @_;
833
834   if( $self->_dbh ) {
835     my @actions;
836
837     push @actions, ( $self->on_disconnect_call || () );
838     push @actions, $self->_parse_connect_do ('on_disconnect_do');
839
840     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
841
842     $self->_dbh_rollback unless $self->_dbh_autocommit;
843
844     %{ $self->_dbh->{CachedKids} } = ();
845     $self->_dbh->disconnect;
846     $self->_dbh(undef);
847     $self->{_dbh_gen}++;
848   }
849 }
850
851 =head2 with_deferred_fk_checks
852
853 =over 4
854
855 =item Arguments: C<$coderef>
856
857 =item Return Value: The return value of $coderef
858
859 =back
860
861 Storage specific method to run the code ref with FK checks deferred or
862 in MySQL's case disabled entirely.
863
864 =cut
865
866 # Storage subclasses should override this
867 sub with_deferred_fk_checks {
868   my ($self, $sub) = @_;
869   $sub->();
870 }
871
872 =head2 connected
873
874 =over
875
876 =item Arguments: none
877
878 =item Return Value: 1|0
879
880 =back
881
882 Verifies that the current database handle is active and ready to execute
883 an SQL statement (e.g. the connection did not get stale, server is still
884 answering, etc.) This method is used internally by L</dbh>.
885
886 =cut
887
888 sub connected {
889   my $self = shift;
890   return 0 unless $self->_seems_connected;
891
892   #be on the safe side
893   local $self->_dbh->{RaiseError} = 1;
894
895   return $self->_ping;
896 }
897
898 sub _seems_connected {
899   my $self = shift;
900
901   $self->_preserve_foreign_dbh;
902
903   my $dbh = $self->_dbh
904     or return 0;
905
906   return $dbh->FETCH('Active');
907 }
908
909 sub _ping {
910   my $self = shift;
911
912   my $dbh = $self->_dbh or return 0;
913
914   return $dbh->ping;
915 }
916
917 sub ensure_connected {
918   my ($self) = @_;
919
920   unless ($self->connected) {
921     $self->_populate_dbh;
922   }
923 }
924
925 =head2 dbh
926
927 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
928 is guaranteed to be healthy by implicitly calling L</connected>, and if
929 necessary performing a reconnection before returning. Keep in mind that this
930 is very B<expensive> on some database engines. Consider using L</dbh_do>
931 instead.
932
933 =cut
934
935 sub dbh {
936   my ($self) = @_;
937
938   if (not $self->_dbh) {
939     $self->_populate_dbh;
940   } else {
941     $self->ensure_connected;
942   }
943   return $self->_dbh;
944 }
945
946 # this is the internal "get dbh or connect (don't check)" method
947 sub _get_dbh {
948   my $self = shift;
949   $self->_preserve_foreign_dbh;
950   $self->_populate_dbh unless $self->_dbh;
951   return $self->_dbh;
952 }
953
954 sub _sql_maker_args {
955     my ($self) = @_;
956
957     return (
958       bindtype=>'columns',
959       array_datatypes => 1,
960       limit_dialect => $self->_get_dbh,
961       %{$self->_sql_maker_opts}
962     );
963 }
964
965 sub sql_maker {
966   my ($self) = @_;
967   unless ($self->_sql_maker) {
968     my $sql_maker_class = $self->sql_maker_class;
969     $self->ensure_class_loaded ($sql_maker_class);
970     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
971   }
972   return $self->_sql_maker;
973 }
974
975 # nothing to do by default
976 sub _rebless {}
977 sub _init {}
978
979 sub _populate_dbh {
980   my ($self) = @_;
981
982   my @info = @{$self->_dbi_connect_info || []};
983   $self->_dbh(undef); # in case ->connected failed we might get sent here
984   $self->_server_info_hash (undef);
985   $self->_dbh($self->_connect(@info));
986
987   $self->_conn_pid($$);
988   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
989
990   $self->_determine_driver;
991
992   # Always set the transaction depth on connect, since
993   #  there is no transaction in progress by definition
994   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
995
996   $self->_run_connection_actions unless $self->{_in_determine_driver};
997 }
998
999 sub _run_connection_actions {
1000   my $self = shift;
1001   my @actions;
1002
1003   push @actions, ( $self->on_connect_call || () );
1004   push @actions, $self->_parse_connect_do ('on_connect_do');
1005
1006   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1007 }
1008
1009 sub _server_info {
1010   my $self = shift;
1011
1012   unless ($self->_server_info_hash) {
1013
1014     my %info;
1015
1016     my $server_version = do {
1017       local $@; # might be happenin in some sort of destructor
1018       eval { $self->_get_server_version };
1019     };
1020
1021     if (defined $server_version) {
1022       $info{dbms_version} = $server_version;
1023
1024       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1025       my @verparts = split (/\./, $numeric_version);
1026       if (
1027         @verparts
1028           &&
1029         $verparts[0] <= 999
1030       ) {
1031         # consider only up to 3 version parts, iff not more than 3 digits
1032         my @use_parts;
1033         while (@verparts && @use_parts < 3) {
1034           my $p = shift @verparts;
1035           last if $p > 999;
1036           push @use_parts, $p;
1037         }
1038         push @use_parts, 0 while @use_parts < 3;
1039
1040         $info{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1041       }
1042     }
1043
1044     $self->_server_info_hash(\%info);
1045   }
1046
1047   return $self->_server_info_hash
1048 }
1049
1050 sub _get_server_version {
1051   shift->_get_dbh->get_info(18);
1052 }
1053
1054 sub _determine_driver {
1055   my ($self) = @_;
1056
1057   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1058     my $started_connected = 0;
1059     local $self->{_in_determine_driver} = 1;
1060
1061     if (ref($self) eq __PACKAGE__) {
1062       my $driver;
1063       if ($self->_dbh) { # we are connected
1064         $driver = $self->_dbh->{Driver}{Name};
1065         $started_connected = 1;
1066       } else {
1067         # if connect_info is a CODEREF, we have no choice but to connect
1068         if (ref $self->_dbi_connect_info->[0] &&
1069             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
1070           $self->_populate_dbh;
1071           $driver = $self->_dbh->{Driver}{Name};
1072         }
1073         else {
1074           # try to use dsn to not require being connected, the driver may still
1075           # force a connection in _rebless to determine version
1076           # (dsn may not be supplied at all if all we do is make a mock-schema)
1077           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1078           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1079           $driver ||= $ENV{DBI_DRIVER};
1080         }
1081       }
1082
1083       if ($driver) {
1084         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1085         if ($self->load_optional_class($storage_class)) {
1086           mro::set_mro($storage_class, 'c3');
1087           bless $self, $storage_class;
1088           $self->_rebless();
1089         }
1090       }
1091     }
1092
1093     $self->_driver_determined(1);
1094
1095     $self->_init; # run driver-specific initializations
1096
1097     $self->_run_connection_actions
1098         if !$started_connected && defined $self->_dbh;
1099   }
1100 }
1101
1102 sub _do_connection_actions {
1103   my $self          = shift;
1104   my $method_prefix = shift;
1105   my $call          = shift;
1106
1107   if (not ref($call)) {
1108     my $method = $method_prefix . $call;
1109     $self->$method(@_);
1110   } elsif (ref($call) eq 'CODE') {
1111     $self->$call(@_);
1112   } elsif (ref($call) eq 'ARRAY') {
1113     if (ref($call->[0]) ne 'ARRAY') {
1114       $self->_do_connection_actions($method_prefix, $_) for @$call;
1115     } else {
1116       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1117     }
1118   } else {
1119     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1120   }
1121
1122   return $self;
1123 }
1124
1125 sub connect_call_do_sql {
1126   my $self = shift;
1127   $self->_do_query(@_);
1128 }
1129
1130 sub disconnect_call_do_sql {
1131   my $self = shift;
1132   $self->_do_query(@_);
1133 }
1134
1135 # override in db-specific backend when necessary
1136 sub connect_call_datetime_setup { 1 }
1137
1138 sub _do_query {
1139   my ($self, $action) = @_;
1140
1141   if (ref $action eq 'CODE') {
1142     $action = $action->($self);
1143     $self->_do_query($_) foreach @$action;
1144   }
1145   else {
1146     # Most debuggers expect ($sql, @bind), so we need to exclude
1147     # the attribute hash which is the second argument to $dbh->do
1148     # furthermore the bind values are usually to be presented
1149     # as named arrayref pairs, so wrap those here too
1150     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1151     my $sql = shift @do_args;
1152     my $attrs = shift @do_args;
1153     my @bind = map { [ undef, $_ ] } @do_args;
1154
1155     $self->_query_start($sql, @bind);
1156     $self->_get_dbh->do($sql, $attrs, @do_args);
1157     $self->_query_end($sql, @bind);
1158   }
1159
1160   return $self;
1161 }
1162
1163 sub _connect {
1164   my ($self, @info) = @_;
1165
1166   $self->throw_exception("You failed to provide any connection info")
1167     if !@info;
1168
1169   my ($old_connect_via, $dbh);
1170
1171   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1172     $old_connect_via = $DBI::connect_via;
1173     $DBI::connect_via = 'connect';
1174   }
1175
1176   eval {
1177     if(ref $info[0] eq 'CODE') {
1178        $dbh = $info[0]->();
1179     }
1180     else {
1181        $dbh = DBI->connect(@info);
1182     }
1183
1184     if($dbh && !$self->unsafe) {
1185       my $weak_self = $self;
1186       Scalar::Util::weaken($weak_self);
1187       $dbh->{HandleError} = sub {
1188           if ($weak_self) {
1189             $weak_self->throw_exception("DBI Exception: $_[0]");
1190           }
1191           else {
1192             # the handler may be invoked by something totally out of
1193             # the scope of DBIC
1194             croak ("DBI Exception: $_[0]");
1195           }
1196       };
1197       $dbh->{ShowErrorStatement} = 1;
1198       $dbh->{RaiseError} = 1;
1199       $dbh->{PrintError} = 0;
1200     }
1201   };
1202
1203   $DBI::connect_via = $old_connect_via if $old_connect_via;
1204
1205   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1206     if !$dbh || $@;
1207
1208   $self->_dbh_autocommit($dbh->{AutoCommit});
1209
1210   $dbh;
1211 }
1212
1213 sub svp_begin {
1214   my ($self, $name) = @_;
1215
1216   $name = $self->_svp_generate_name
1217     unless defined $name;
1218
1219   $self->throw_exception ("You can't use savepoints outside a transaction")
1220     if $self->{transaction_depth} == 0;
1221
1222   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1223     unless $self->can('_svp_begin');
1224
1225   push @{ $self->{savepoints} }, $name;
1226
1227   $self->debugobj->svp_begin($name) if $self->debug;
1228
1229   return $self->_svp_begin($name);
1230 }
1231
1232 sub svp_release {
1233   my ($self, $name) = @_;
1234
1235   $self->throw_exception ("You can't use savepoints outside a transaction")
1236     if $self->{transaction_depth} == 0;
1237
1238   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1239     unless $self->can('_svp_release');
1240
1241   if (defined $name) {
1242     $self->throw_exception ("Savepoint '$name' does not exist")
1243       unless grep { $_ eq $name } @{ $self->{savepoints} };
1244
1245     # Dig through the stack until we find the one we are releasing.  This keeps
1246     # the stack up to date.
1247     my $svp;
1248
1249     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1250   } else {
1251     $name = pop @{ $self->{savepoints} };
1252   }
1253
1254   $self->debugobj->svp_release($name) if $self->debug;
1255
1256   return $self->_svp_release($name);
1257 }
1258
1259 sub svp_rollback {
1260   my ($self, $name) = @_;
1261
1262   $self->throw_exception ("You can't use savepoints outside a transaction")
1263     if $self->{transaction_depth} == 0;
1264
1265   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1266     unless $self->can('_svp_rollback');
1267
1268   if (defined $name) {
1269       # If they passed us a name, verify that it exists in the stack
1270       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1271           $self->throw_exception("Savepoint '$name' does not exist!");
1272       }
1273
1274       # Dig through the stack until we find the one we are releasing.  This keeps
1275       # the stack up to date.
1276       while(my $s = pop(@{ $self->{savepoints} })) {
1277           last if($s eq $name);
1278       }
1279       # Add the savepoint back to the stack, as a rollback doesn't remove the
1280       # named savepoint, only everything after it.
1281       push(@{ $self->{savepoints} }, $name);
1282   } else {
1283       # We'll assume they want to rollback to the last savepoint
1284       $name = $self->{savepoints}->[-1];
1285   }
1286
1287   $self->debugobj->svp_rollback($name) if $self->debug;
1288
1289   return $self->_svp_rollback($name);
1290 }
1291
1292 sub _svp_generate_name {
1293     my ($self) = @_;
1294
1295     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1296 }
1297
1298 sub txn_begin {
1299   my $self = shift;
1300
1301   # this means we have not yet connected and do not know the AC status
1302   # (e.g. coderef $dbh)
1303   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1304
1305   if($self->{transaction_depth} == 0) {
1306     $self->debugobj->txn_begin()
1307       if $self->debug;
1308     $self->_dbh_begin_work;
1309   }
1310   elsif ($self->auto_savepoint) {
1311     $self->svp_begin;
1312   }
1313   $self->{transaction_depth}++;
1314 }
1315
1316 sub _dbh_begin_work {
1317   my $self = shift;
1318
1319   # if the user is utilizing txn_do - good for him, otherwise we need to
1320   # ensure that the $dbh is healthy on BEGIN.
1321   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1322   # will be replaced by a failure of begin_work itself (which will be
1323   # then retried on reconnect)
1324   if ($self->{_in_dbh_do}) {
1325     $self->_dbh->begin_work;
1326   } else {
1327     $self->dbh_do(sub { $_[1]->begin_work });
1328   }
1329 }
1330
1331 sub txn_commit {
1332   my $self = shift;
1333   if ($self->{transaction_depth} == 1) {
1334     $self->debugobj->txn_commit()
1335       if ($self->debug);
1336     $self->_dbh_commit;
1337     $self->{transaction_depth} = 0
1338       if $self->_dbh_autocommit;
1339   }
1340   elsif($self->{transaction_depth} > 1) {
1341     $self->{transaction_depth}--;
1342     $self->svp_release
1343       if $self->auto_savepoint;
1344   }
1345 }
1346
1347 sub _dbh_commit {
1348   my $self = shift;
1349   my $dbh  = $self->_dbh
1350     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1351   $dbh->commit;
1352 }
1353
1354 sub txn_rollback {
1355   my $self = shift;
1356   my $dbh = $self->_dbh;
1357   eval {
1358     if ($self->{transaction_depth} == 1) {
1359       $self->debugobj->txn_rollback()
1360         if ($self->debug);
1361       $self->{transaction_depth} = 0
1362         if $self->_dbh_autocommit;
1363       $self->_dbh_rollback;
1364     }
1365     elsif($self->{transaction_depth} > 1) {
1366       $self->{transaction_depth}--;
1367       if ($self->auto_savepoint) {
1368         $self->svp_rollback;
1369         $self->svp_release;
1370       }
1371     }
1372     else {
1373       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1374     }
1375   };
1376   if ($@) {
1377     my $error = $@;
1378     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1379     $error =~ /$exception_class/ and $self->throw_exception($error);
1380     # ensure that a failed rollback resets the transaction depth
1381     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1382     $self->throw_exception($error);
1383   }
1384 }
1385
1386 sub _dbh_rollback {
1387   my $self = shift;
1388   my $dbh  = $self->_dbh
1389     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1390   $dbh->rollback;
1391 }
1392
1393 # This used to be the top-half of _execute.  It was split out to make it
1394 #  easier to override in NoBindVars without duping the rest.  It takes up
1395 #  all of _execute's args, and emits $sql, @bind.
1396 sub _prep_for_execute {
1397   my ($self, $op, $extra_bind, $ident, $args) = @_;
1398
1399   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1400     $ident = $ident->from();
1401   }
1402
1403   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1404
1405   unshift(@bind,
1406     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1407       if $extra_bind;
1408   return ($sql, \@bind);
1409 }
1410
1411
1412 sub _fix_bind_params {
1413     my ($self, @bind) = @_;
1414
1415     ### Turn @bind from something like this:
1416     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1417     ### to this:
1418     ###   ( "'1'", "'1'", "'3'" )
1419     return
1420         map {
1421             if ( defined( $_ && $_->[1] ) ) {
1422                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1423             }
1424             else { q{'NULL'}; }
1425         } @bind;
1426 }
1427
1428 sub _query_start {
1429     my ( $self, $sql, @bind ) = @_;
1430
1431     if ( $self->debug ) {
1432         @bind = $self->_fix_bind_params(@bind);
1433
1434         $self->debugobj->query_start( $sql, @bind );
1435     }
1436 }
1437
1438 sub _query_end {
1439     my ( $self, $sql, @bind ) = @_;
1440
1441     if ( $self->debug ) {
1442         @bind = $self->_fix_bind_params(@bind);
1443         $self->debugobj->query_end( $sql, @bind );
1444     }
1445 }
1446
1447 sub _dbh_execute {
1448   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1449
1450   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1451
1452   $self->_query_start( $sql, @$bind );
1453
1454   my $sth = $self->sth($sql,$op);
1455
1456   my $placeholder_index = 1;
1457
1458   foreach my $bound (@$bind) {
1459     my $attributes = {};
1460     my($column_name, @data) = @$bound;
1461
1462     if ($bind_attributes) {
1463       $attributes = $bind_attributes->{$column_name}
1464       if defined $bind_attributes->{$column_name};
1465     }
1466
1467     foreach my $data (@data) {
1468       my $ref = ref $data;
1469       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1470
1471       $sth->bind_param($placeholder_index, $data, $attributes);
1472       $placeholder_index++;
1473     }
1474   }
1475
1476   # Can this fail without throwing an exception anyways???
1477   my $rv = $sth->execute();
1478   $self->throw_exception($sth->errstr) if !$rv;
1479
1480   $self->_query_end( $sql, @$bind );
1481
1482   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1483 }
1484
1485 sub _execute {
1486     my $self = shift;
1487     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1488 }
1489
1490 sub _prefetch_insert_auto_nextvals {
1491   my ($self, $source, $to_insert) = @_;
1492
1493   my $upd = {};
1494
1495   foreach my $col ( $source->columns ) {
1496     if ( !defined $to_insert->{$col} ) {
1497       my $col_info = $source->column_info($col);
1498
1499       if ( $col_info->{auto_nextval} ) {
1500         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1501           'nextval',
1502           $col_info->{sequence} ||=
1503             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1504         );
1505       }
1506     }
1507   }
1508
1509   return $upd;
1510 }
1511
1512 sub insert {
1513   my $self = shift;
1514   my ($source, $to_insert, $opts) = @_;
1515
1516   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1517
1518   my $bind_attributes = $self->source_bind_attributes($source);
1519
1520   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1521
1522   if ($opts->{returning}) {
1523     my @ret_cols = @{$opts->{returning}};
1524
1525     my @ret_vals = eval {
1526       local $SIG{__WARN__} = sub {};
1527       my @r = $sth->fetchrow_array;
1528       $sth->finish;
1529       @r;
1530     };
1531
1532     my %ret;
1533     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1534
1535     $updated_cols = {
1536       %$updated_cols,
1537       %ret,
1538     };
1539   }
1540
1541   return $updated_cols;
1542 }
1543
1544 ## Currently it is assumed that all values passed will be "normal", i.e. not
1545 ## scalar refs, or at least, all the same type as the first set, the statement is
1546 ## only prepped once.
1547 sub insert_bulk {
1548   my ($self, $source, $cols, $data) = @_;
1549
1550   my %colvalues;
1551   @colvalues{@$cols} = (0..$#$cols);
1552
1553   for my $i (0..$#$cols) {
1554     my $first_val = $data->[0][$i];
1555     next unless ref $first_val eq 'SCALAR';
1556
1557     $colvalues{ $cols->[$i] } = $first_val;
1558   }
1559
1560   # check for bad data and stringify stringifiable objects
1561   my $bad_slice = sub {
1562     my ($msg, $col_idx, $slice_idx) = @_;
1563     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1564       $msg,
1565       $cols->[$col_idx],
1566       do {
1567         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1568         Data::Dumper::Concise::Dumper({
1569           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1570         }),
1571       }
1572     );
1573   };
1574
1575   for my $datum_idx (0..$#$data) {
1576     my $datum = $data->[$datum_idx];
1577
1578     for my $col_idx (0..$#$cols) {
1579       my $val            = $datum->[$col_idx];
1580       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1581       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1582
1583       if ($is_literal_sql) {
1584         if (not ref $val) {
1585           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1586         }
1587         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1588           $bad_slice->("$reftype reference found where literal SQL expected",
1589             $col_idx, $datum_idx);
1590         }
1591         elsif ($$val ne $$sqla_bind){
1592           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1593             $col_idx, $datum_idx);
1594         }
1595       }
1596       elsif (my $reftype = ref $val) {
1597         require overload;
1598         if (overload::Method($val, '""')) {
1599           $datum->[$col_idx] = "".$val;
1600         }
1601         else {
1602           $bad_slice->("$reftype reference found where bind expected",
1603             $col_idx, $datum_idx);
1604         }
1605       }
1606     }
1607   }
1608
1609   my ($sql, $bind) = $self->_prep_for_execute (
1610     'insert', undef, $source, [\%colvalues]
1611   );
1612   my @bind = @$bind;
1613
1614   my $empty_bind = 1 if (not @bind) &&
1615     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1616
1617   if ((not @bind) && (not $empty_bind)) {
1618     $self->throw_exception(
1619       'Cannot insert_bulk without support for placeholders'
1620     );
1621   }
1622
1623   # neither _execute_array, nor _execute_inserts_with_no_binds are
1624   # atomic (even if _execute _array is a single call). Thus a safety
1625   # scope guard
1626   my $guard = $self->txn_scope_guard;
1627
1628   $self->_query_start( $sql, ['__BULK__'] );
1629   my $sth = $self->sth($sql);
1630   my $rv = do {
1631     if ($empty_bind) {
1632       # bind_param_array doesn't work if there are no binds
1633       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1634     }
1635     else {
1636 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1637       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1638     }
1639   };
1640
1641   $self->_query_end( $sql, ['__BULK__'] );
1642
1643   $guard->commit;
1644
1645   return (wantarray ? ($rv, $sth, @bind) : $rv);
1646 }
1647
1648 sub _execute_array {
1649   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1650
1651   ## This must be an arrayref, else nothing works!
1652   my $tuple_status = [];
1653
1654   ## Get the bind_attributes, if any exist
1655   my $bind_attributes = $self->source_bind_attributes($source);
1656
1657   ## Bind the values and execute
1658   my $placeholder_index = 1;
1659
1660   foreach my $bound (@$bind) {
1661
1662     my $attributes = {};
1663     my ($column_name, $data_index) = @$bound;
1664
1665     if( $bind_attributes ) {
1666       $attributes = $bind_attributes->{$column_name}
1667       if defined $bind_attributes->{$column_name};
1668     }
1669
1670     my @data = map { $_->[$data_index] } @$data;
1671
1672     $sth->bind_param_array(
1673       $placeholder_index,
1674       [@data],
1675       (%$attributes ?  $attributes : ()),
1676     );
1677     $placeholder_index++;
1678   }
1679
1680   my $rv = eval {
1681     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1682   };
1683   my $err = $@ || $sth->errstr;
1684
1685 # Statement must finish even if there was an exception.
1686   eval { $sth->finish };
1687   $err = $@ unless $err;
1688
1689   if ($err) {
1690     my $i = 0;
1691     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1692
1693     $self->throw_exception("Unexpected populate error: $err")
1694       if ($i > $#$tuple_status);
1695
1696     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1697       ($tuple_status->[$i][1] || $err),
1698       Data::Dumper::Concise::Dumper({
1699         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1700       }),
1701     );
1702   }
1703   return $rv;
1704 }
1705
1706 sub _dbh_execute_array {
1707     my ($self, $sth, $tuple_status, @extra) = @_;
1708
1709     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1710 }
1711
1712 sub _dbh_execute_inserts_with_no_binds {
1713   my ($self, $sth, $count) = @_;
1714
1715   eval {
1716     my $dbh = $self->_get_dbh;
1717     local $dbh->{RaiseError} = 1;
1718     local $dbh->{PrintError} = 0;
1719
1720     $sth->execute foreach 1..$count;
1721   };
1722   my $exception = $@;
1723
1724 # Make sure statement is finished even if there was an exception.
1725   eval { $sth->finish };
1726   $exception = $@ unless $exception;
1727
1728   $self->throw_exception($exception) if $exception;
1729
1730   return $count;
1731 }
1732
1733 sub update {
1734   my ($self, $source, @args) = @_;
1735
1736   my $bind_attrs = $self->source_bind_attributes($source);
1737
1738   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1739 }
1740
1741
1742 sub delete {
1743   my ($self, $source, @args) = @_;
1744
1745   my $bind_attrs = $self->source_bind_attributes($source);
1746
1747   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1748 }
1749
1750 # We were sent here because the $rs contains a complex search
1751 # which will require a subquery to select the correct rows
1752 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1753 #
1754 # Generating a single PK column subquery is trivial and supported
1755 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1756 # Look at _multipk_update_delete()
1757 sub _subq_update_delete {
1758   my $self = shift;
1759   my ($rs, $op, $values) = @_;
1760
1761   my $rsrc = $rs->result_source;
1762
1763   # quick check if we got a sane rs on our hands
1764   my @pcols = $rsrc->_pri_cols;
1765
1766   my $sel = $rs->_resolved_attrs->{select};
1767   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1768
1769   if (
1770       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1771         ne
1772       join ("\x00", sort @$sel )
1773   ) {
1774     $self->throw_exception (
1775       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1776     );
1777   }
1778
1779   if (@pcols == 1) {
1780     return $self->$op (
1781       $rsrc,
1782       $op eq 'update' ? $values : (),
1783       { $pcols[0] => { -in => $rs->as_query } },
1784     );
1785   }
1786
1787   else {
1788     return $self->_multipk_update_delete (@_);
1789   }
1790 }
1791
1792 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1793 # resultset update/delete involving subqueries. So by default resort
1794 # to simple (and inefficient) delete_all style per-row opearations,
1795 # while allowing specific storages to override this with a faster
1796 # implementation.
1797 #
1798 sub _multipk_update_delete {
1799   return shift->_per_row_update_delete (@_);
1800 }
1801
1802 # This is the default loop used to delete/update rows for multi PK
1803 # resultsets, and used by mysql exclusively (because it can't do anything
1804 # else).
1805 #
1806 # We do not use $row->$op style queries, because resultset update/delete
1807 # is not expected to cascade (this is what delete_all/update_all is for).
1808 #
1809 # There should be no race conditions as the entire operation is rolled
1810 # in a transaction.
1811 #
1812 sub _per_row_update_delete {
1813   my $self = shift;
1814   my ($rs, $op, $values) = @_;
1815
1816   my $rsrc = $rs->result_source;
1817   my @pcols = $rsrc->_pri_cols;
1818
1819   my $guard = $self->txn_scope_guard;
1820
1821   # emulate the return value of $sth->execute for non-selects
1822   my $row_cnt = '0E0';
1823
1824   my $subrs_cur = $rs->cursor;
1825   my @all_pk = $subrs_cur->all;
1826   for my $pks ( @all_pk) {
1827
1828     my $cond;
1829     for my $i (0.. $#pcols) {
1830       $cond->{$pcols[$i]} = $pks->[$i];
1831     }
1832
1833     $self->$op (
1834       $rsrc,
1835       $op eq 'update' ? $values : (),
1836       $cond,
1837     );
1838
1839     $row_cnt++;
1840   }
1841
1842   $guard->commit;
1843
1844   return $row_cnt;
1845 }
1846
1847 sub _select {
1848   my $self = shift;
1849   $self->_execute($self->_select_args(@_));
1850 }
1851
1852 sub _select_args_to_query {
1853   my $self = shift;
1854
1855   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1856   #  = $self->_select_args($ident, $select, $cond, $attrs);
1857   my ($op, $bind, $ident, $bind_attrs, @args) =
1858     $self->_select_args(@_);
1859
1860   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1861   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1862   $prepared_bind ||= [];
1863
1864   return wantarray
1865     ? ($sql, $prepared_bind, $bind_attrs)
1866     : \[ "($sql)", @$prepared_bind ]
1867   ;
1868 }
1869
1870 sub _select_args {
1871   my ($self, $ident, $select, $where, $attrs) = @_;
1872
1873   my $sql_maker = $self->sql_maker;
1874   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1875
1876   $attrs = {
1877     %$attrs,
1878     select => $select,
1879     from => $ident,
1880     where => $where,
1881     $rs_alias && $alias2source->{$rs_alias}
1882       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1883       : ()
1884     ,
1885   };
1886
1887   # calculate bind_attrs before possible $ident mangling
1888   my $bind_attrs = {};
1889   for my $alias (keys %$alias2source) {
1890     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1891     for my $col (keys %$bindtypes) {
1892
1893       my $fqcn = join ('.', $alias, $col);
1894       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1895
1896       # Unqialified column names are nice, but at the same time can be
1897       # rather ambiguous. What we do here is basically go along with
1898       # the loop, adding an unqualified column slot to $bind_attrs,
1899       # alongside the fully qualified name. As soon as we encounter
1900       # another column by that name (which would imply another table)
1901       # we unset the unqualified slot and never add any info to it
1902       # to avoid erroneous type binding. If this happens the users
1903       # only choice will be to fully qualify his column name
1904
1905       if (exists $bind_attrs->{$col}) {
1906         $bind_attrs->{$col} = {};
1907       }
1908       else {
1909         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1910       }
1911     }
1912   }
1913
1914   # adjust limits
1915   if (
1916     $attrs->{software_limit}
1917       ||
1918     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1919   ) {
1920     $attrs->{software_limit} = 1;
1921   }
1922   else {
1923     $self->throw_exception("rows attribute must be positive if present")
1924       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1925
1926     # MySQL actually recommends this approach.  I cringe.
1927     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1928   }
1929
1930   my @limit;
1931
1932   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1933   # storage, unless software limit was requested
1934   if (
1935     #limited has_many
1936     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1937        ||
1938     # grouped prefetch (to satisfy group_by == select)
1939     ( $attrs->{group_by}
1940         &&
1941       @{$attrs->{group_by}}
1942         &&
1943       $attrs->{_prefetch_select}
1944         &&
1945       @{$attrs->{_prefetch_select}}
1946     )
1947   ) {
1948     ($ident, $select, $where, $attrs)
1949       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1950   }
1951   elsif (! $attrs->{software_limit} ) {
1952     push @limit, $attrs->{rows}, $attrs->{offset};
1953   }
1954
1955   # try to simplify the joinmap further (prune unreferenced type-single joins)
1956   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1957
1958 ###
1959   # This would be the point to deflate anything found in $where
1960   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1961   # expect a row object. And all we have is a resultsource (it is trivial
1962   # to extract deflator coderefs via $alias2source above).
1963   #
1964   # I don't see a way forward other than changing the way deflators are
1965   # invoked, and that's just bad...
1966 ###
1967
1968   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
1969 }
1970
1971 # Returns a counting SELECT for a simple count
1972 # query. Abstracted so that a storage could override
1973 # this to { count => 'firstcol' } or whatever makes
1974 # sense as a performance optimization
1975 sub _count_select {
1976   #my ($self, $source, $rs_attrs) = @_;
1977   return { count => '*' };
1978 }
1979
1980
1981 sub source_bind_attributes {
1982   my ($self, $source) = @_;
1983
1984   my $bind_attributes;
1985   foreach my $column ($source->columns) {
1986
1987     my $data_type = $source->column_info($column)->{data_type} || '';
1988     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1989      if $data_type;
1990   }
1991
1992   return $bind_attributes;
1993 }
1994
1995 =head2 select
1996
1997 =over 4
1998
1999 =item Arguments: $ident, $select, $condition, $attrs
2000
2001 =back
2002
2003 Handle a SQL select statement.
2004
2005 =cut
2006
2007 sub select {
2008   my $self = shift;
2009   my ($ident, $select, $condition, $attrs) = @_;
2010   return $self->cursor_class->new($self, \@_, $attrs);
2011 }
2012
2013 sub select_single {
2014   my $self = shift;
2015   my ($rv, $sth, @bind) = $self->_select(@_);
2016   my @row = $sth->fetchrow_array;
2017   my @nextrow = $sth->fetchrow_array if @row;
2018   if(@row && @nextrow) {
2019     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2020   }
2021   # Need to call finish() to work round broken DBDs
2022   $sth->finish();
2023   return @row;
2024 }
2025
2026 =head2 sth
2027
2028 =over 4
2029
2030 =item Arguments: $sql
2031
2032 =back
2033
2034 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2035
2036 =cut
2037
2038 sub _dbh_sth {
2039   my ($self, $dbh, $sql) = @_;
2040
2041   # 3 is the if_active parameter which avoids active sth re-use
2042   my $sth = $self->disable_sth_caching
2043     ? $dbh->prepare($sql)
2044     : $dbh->prepare_cached($sql, {}, 3);
2045
2046   # XXX You would think RaiseError would make this impossible,
2047   #  but apparently that's not true :(
2048   $self->throw_exception($dbh->errstr) if !$sth;
2049
2050   $sth;
2051 }
2052
2053 sub sth {
2054   my ($self, $sql) = @_;
2055   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2056 }
2057
2058 sub _dbh_columns_info_for {
2059   my ($self, $dbh, $table) = @_;
2060
2061   if ($dbh->can('column_info')) {
2062     my %result;
2063     eval {
2064       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2065       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2066       $sth->execute();
2067       while ( my $info = $sth->fetchrow_hashref() ){
2068         my %column_info;
2069         $column_info{data_type}   = $info->{TYPE_NAME};
2070         $column_info{size}      = $info->{COLUMN_SIZE};
2071         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2072         $column_info{default_value} = $info->{COLUMN_DEF};
2073         my $col_name = $info->{COLUMN_NAME};
2074         $col_name =~ s/^\"(.*)\"$/$1/;
2075
2076         $result{$col_name} = \%column_info;
2077       }
2078     };
2079     return \%result if !$@ && scalar keys %result;
2080   }
2081
2082   my %result;
2083   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2084   $sth->execute;
2085   my @columns = @{$sth->{NAME_lc}};
2086   for my $i ( 0 .. $#columns ){
2087     my %column_info;
2088     $column_info{data_type} = $sth->{TYPE}->[$i];
2089     $column_info{size} = $sth->{PRECISION}->[$i];
2090     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2091
2092     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2093       $column_info{data_type} = $1;
2094       $column_info{size}    = $2;
2095     }
2096
2097     $result{$columns[$i]} = \%column_info;
2098   }
2099   $sth->finish;
2100
2101   foreach my $col (keys %result) {
2102     my $colinfo = $result{$col};
2103     my $type_num = $colinfo->{data_type};
2104     my $type_name;
2105     if(defined $type_num && $dbh->can('type_info')) {
2106       my $type_info = $dbh->type_info($type_num);
2107       $type_name = $type_info->{TYPE_NAME} if $type_info;
2108       $colinfo->{data_type} = $type_name if $type_name;
2109     }
2110   }
2111
2112   return \%result;
2113 }
2114
2115 sub columns_info_for {
2116   my ($self, $table) = @_;
2117   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2118 }
2119
2120 =head2 last_insert_id
2121
2122 Return the row id of the last insert.
2123
2124 =cut
2125
2126 sub _dbh_last_insert_id {
2127     my ($self, $dbh, $source, $col) = @_;
2128
2129     my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2130
2131     return $id if defined $id;
2132
2133     my $class = ref $self;
2134     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2135 }
2136
2137 sub last_insert_id {
2138   my $self = shift;
2139   $self->_dbh_last_insert_id ($self->_dbh, @_);
2140 }
2141
2142 =head2 _native_data_type
2143
2144 =over 4
2145
2146 =item Arguments: $type_name
2147
2148 =back
2149
2150 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2151 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2152 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2153
2154 The default implementation returns C<undef>, implement in your Storage driver if
2155 you need this functionality.
2156
2157 Should map types from other databases to the native RDBMS type, for example
2158 C<VARCHAR2> to C<VARCHAR>.
2159
2160 Types with modifiers should map to the underlying data type. For example,
2161 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2162
2163 Composite types should map to the container type, for example
2164 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2165
2166 =cut
2167
2168 sub _native_data_type {
2169   #my ($self, $data_type) = @_;
2170   return undef
2171 }
2172
2173 # Check if placeholders are supported at all
2174 sub _placeholders_supported {
2175   my $self = shift;
2176   my $dbh  = $self->_get_dbh;
2177
2178   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2179   # but it is inaccurate more often than not
2180   eval {
2181     local $dbh->{PrintError} = 0;
2182     local $dbh->{RaiseError} = 1;
2183     $dbh->do('select ?', {}, 1);
2184   };
2185   return $@ ? 0 : 1;
2186 }
2187
2188 # Check if placeholders bound to non-string types throw exceptions
2189 #
2190 sub _typeless_placeholders_supported {
2191   my $self = shift;
2192   my $dbh  = $self->_get_dbh;
2193
2194   eval {
2195     local $dbh->{PrintError} = 0;
2196     local $dbh->{RaiseError} = 1;
2197     # this specifically tests a bind that is NOT a string
2198     $dbh->do('select 1 where 1 = ?', {}, 1);
2199   };
2200   return $@ ? 0 : 1;
2201 }
2202
2203 =head2 sqlt_type
2204
2205 Returns the database driver name.
2206
2207 =cut
2208
2209 sub sqlt_type {
2210   shift->_get_dbh->{Driver}->{Name};
2211 }
2212
2213 =head2 bind_attribute_by_data_type
2214
2215 Given a datatype from column info, returns a database specific bind
2216 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2217 let the database planner just handle it.
2218
2219 Generally only needed for special case column types, like bytea in postgres.
2220
2221 =cut
2222
2223 sub bind_attribute_by_data_type {
2224     return;
2225 }
2226
2227 =head2 is_datatype_numeric
2228
2229 Given a datatype from column_info, returns a boolean value indicating if
2230 the current RDBMS considers it a numeric value. This controls how
2231 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2232 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2233 be performed instead of the usual C<eq>.
2234
2235 =cut
2236
2237 sub is_datatype_numeric {
2238   my ($self, $dt) = @_;
2239
2240   return 0 unless $dt;
2241
2242   return $dt =~ /^ (?:
2243     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2244   ) $/ix;
2245 }
2246
2247
2248 =head2 create_ddl_dir
2249
2250 =over 4
2251
2252 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2253
2254 =back
2255
2256 Creates a SQL file based on the Schema, for each of the specified
2257 database engines in C<\@databases> in the given directory.
2258 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2259
2260 Given a previous version number, this will also create a file containing
2261 the ALTER TABLE statements to transform the previous schema into the
2262 current one. Note that these statements may contain C<DROP TABLE> or
2263 C<DROP COLUMN> statements that can potentially destroy data.
2264
2265 The file names are created using the C<ddl_filename> method below, please
2266 override this method in your schema if you would like a different file
2267 name format. For the ALTER file, the same format is used, replacing
2268 $version in the name with "$preversion-$version".
2269
2270 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2271 The most common value for this would be C<< { add_drop_table => 1 } >>
2272 to have the SQL produced include a C<DROP TABLE> statement for each table
2273 created. For quoting purposes supply C<quote_table_names> and
2274 C<quote_field_names>.
2275
2276 If no arguments are passed, then the following default values are assumed:
2277
2278 =over 4
2279
2280 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2281
2282 =item version    - $schema->schema_version
2283
2284 =item directory  - './'
2285
2286 =item preversion - <none>
2287
2288 =back
2289
2290 By default, C<\%sqlt_args> will have
2291
2292  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2293
2294 merged with the hash passed in. To disable any of those features, pass in a
2295 hashref like the following
2296
2297  { ignore_constraint_names => 0, # ... other options }
2298
2299
2300 WARNING: You are strongly advised to check all SQL files created, before applying
2301 them.
2302
2303 =cut
2304
2305 sub create_ddl_dir {
2306   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2307
2308   unless ($dir) {
2309     carp "No directory given, using ./\n";
2310     $dir = './';
2311   } else {
2312       -d $dir or File::Path::mkpath($dir)
2313           or $self->throw_exception("create_ddl_dir: $! creating dir '$dir'");
2314   }
2315
2316   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2317
2318   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2319   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2320
2321   my $schema_version = $schema->schema_version || '1.x';
2322   $version ||= $schema_version;
2323
2324   $sqltargs = {
2325     add_drop_table => 1,
2326     ignore_constraint_names => 1,
2327     ignore_index_names => 1,
2328     %{$sqltargs || {}}
2329   };
2330
2331   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2332     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2333   }
2334
2335   my $sqlt = SQL::Translator->new( $sqltargs );
2336
2337   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2338   my $sqlt_schema = $sqlt->translate({ data => $schema })
2339     or $self->throw_exception ($sqlt->error);
2340
2341   foreach my $db (@$databases) {
2342     $sqlt->reset();
2343     $sqlt->{schema} = $sqlt_schema;
2344     $sqlt->producer($db);
2345
2346     my $file;
2347     my $filename = $schema->ddl_filename($db, $version, $dir);
2348     if (-e $filename && ($version eq $schema_version )) {
2349       # if we are dumping the current version, overwrite the DDL
2350       carp "Overwriting existing DDL file - $filename";
2351       unlink($filename);
2352     }
2353
2354     my $output = $sqlt->translate;
2355     if(!$output) {
2356       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2357       next;
2358     }
2359     if(!open($file, ">$filename")) {
2360       $self->throw_exception("Can't open $filename for writing ($!)");
2361       next;
2362     }
2363     print $file $output;
2364     close($file);
2365
2366     next unless ($preversion);
2367
2368     require SQL::Translator::Diff;
2369
2370     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2371     if(!-e $prefilename) {
2372       carp("No previous schema file found ($prefilename)");
2373       next;
2374     }
2375
2376     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2377     if(-e $difffile) {
2378       carp("Overwriting existing diff file - $difffile");
2379       unlink($difffile);
2380     }
2381
2382     my $source_schema;
2383     {
2384       my $t = SQL::Translator->new($sqltargs);
2385       $t->debug( 0 );
2386       $t->trace( 0 );
2387
2388       $t->parser( $db )
2389         or $self->throw_exception ($t->error);
2390
2391       my $out = $t->translate( $prefilename )
2392         or $self->throw_exception ($t->error);
2393
2394       $source_schema = $t->schema;
2395
2396       $source_schema->name( $prefilename )
2397         unless ( $source_schema->name );
2398     }
2399
2400     # The "new" style of producers have sane normalization and can support
2401     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2402     # And we have to diff parsed SQL against parsed SQL.
2403     my $dest_schema = $sqlt_schema;
2404
2405     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2406       my $t = SQL::Translator->new($sqltargs);
2407       $t->debug( 0 );
2408       $t->trace( 0 );
2409
2410       $t->parser( $db )
2411         or $self->throw_exception ($t->error);
2412
2413       my $out = $t->translate( $filename )
2414         or $self->throw_exception ($t->error);
2415
2416       $dest_schema = $t->schema;
2417
2418       $dest_schema->name( $filename )
2419         unless $dest_schema->name;
2420     }
2421
2422     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2423                                                   $dest_schema,   $db,
2424                                                   $sqltargs
2425                                                  );
2426     if(!open $file, ">$difffile") {
2427       $self->throw_exception("Can't write to $difffile ($!)");
2428       next;
2429     }
2430     print $file $diff;
2431     close($file);
2432   }
2433 }
2434
2435 =head2 deployment_statements
2436
2437 =over 4
2438
2439 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2440
2441 =back
2442
2443 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2444
2445 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2446 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2447
2448 C<$directory> is used to return statements from files in a previously created
2449 L</create_ddl_dir> directory and is optional. The filenames are constructed
2450 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2451
2452 If no C<$directory> is specified then the statements are constructed on the
2453 fly using L<SQL::Translator> and C<$version> is ignored.
2454
2455 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2456
2457 =cut
2458
2459 sub deployment_statements {
2460   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2461   $type ||= $self->sqlt_type;
2462   $version ||= $schema->schema_version || '1.x';
2463   $dir ||= './';
2464   my $filename = $schema->ddl_filename($type, $version, $dir);
2465   if(-f $filename)
2466   {
2467       my $file;
2468       open($file, "<$filename")
2469         or $self->throw_exception("Can't open $filename ($!)");
2470       my @rows = <$file>;
2471       close($file);
2472       return join('', @rows);
2473   }
2474
2475   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2476     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2477   }
2478
2479   # sources needs to be a parser arg, but for simplicty allow at top level
2480   # coming in
2481   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2482       if exists $sqltargs->{sources};
2483
2484   my $tr = SQL::Translator->new(
2485     producer => "SQL::Translator::Producer::${type}",
2486     %$sqltargs,
2487     parser => 'SQL::Translator::Parser::DBIx::Class',
2488     data => $schema,
2489   );
2490
2491   my @ret;
2492   my $wa = wantarray;
2493   if ($wa) {
2494     @ret = $tr->translate;
2495   }
2496   else {
2497     $ret[0] = $tr->translate;
2498   }
2499
2500   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2501     unless (@ret && defined $ret[0]);
2502
2503   return $wa ? @ret : $ret[0];
2504 }
2505
2506 sub deploy {
2507   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2508   my $deploy = sub {
2509     my $line = shift;
2510     return if($line =~ /^--/);
2511     return if(!$line);
2512     # next if($line =~ /^DROP/m);
2513     return if($line =~ /^BEGIN TRANSACTION/m);
2514     return if($line =~ /^COMMIT/m);
2515     return if $line =~ /^\s+$/; # skip whitespace only
2516     $self->_query_start($line);
2517     eval {
2518       # do a dbh_do cycle here, as we need some error checking in
2519       # place (even though we will ignore errors)
2520       $self->dbh_do (sub { $_[1]->do($line) });
2521     };
2522     if ($@) {
2523       carp qq{$@ (running "${line}")};
2524     }
2525     $self->_query_end($line);
2526   };
2527   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2528   if (@statements > 1) {
2529     foreach my $statement (@statements) {
2530       $deploy->( $statement );
2531     }
2532   }
2533   elsif (@statements == 1) {
2534     foreach my $line ( split(";\n", $statements[0])) {
2535       $deploy->( $line );
2536     }
2537   }
2538 }
2539
2540 =head2 datetime_parser
2541
2542 Returns the datetime parser class
2543
2544 =cut
2545
2546 sub datetime_parser {
2547   my $self = shift;
2548   return $self->{datetime_parser} ||= do {
2549     $self->build_datetime_parser(@_);
2550   };
2551 }
2552
2553 =head2 datetime_parser_type
2554
2555 Defines (returns) the datetime parser class - currently hardwired to
2556 L<DateTime::Format::MySQL>
2557
2558 =cut
2559
2560 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2561
2562 =head2 build_datetime_parser
2563
2564 See L</datetime_parser>
2565
2566 =cut
2567
2568 sub build_datetime_parser {
2569   my $self = shift;
2570   my $type = $self->datetime_parser_type(@_);
2571   $self->ensure_class_loaded ($type);
2572   return $type;
2573 }
2574
2575
2576 =head2 is_replicating
2577
2578 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2579 replicate from a master database.  Default is undef, which is the result
2580 returned by databases that don't support replication.
2581
2582 =cut
2583
2584 sub is_replicating {
2585     return;
2586
2587 }
2588
2589 =head2 lag_behind_master
2590
2591 Returns a number that represents a certain amount of lag behind a master db
2592 when a given storage is replicating.  The number is database dependent, but
2593 starts at zero and increases with the amount of lag. Default in undef
2594
2595 =cut
2596
2597 sub lag_behind_master {
2598     return;
2599 }
2600
2601 =head2 relname_to_table_alias
2602
2603 =over 4
2604
2605 =item Arguments: $relname, $join_count
2606
2607 =back
2608
2609 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2610 queries.
2611
2612 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2613 way these aliases are named.
2614
2615 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2616 otherwise C<"$relname">.
2617
2618 =cut
2619
2620 sub relname_to_table_alias {
2621   my ($self, $relname, $join_count) = @_;
2622
2623   my $alias = ($join_count && $join_count > 1 ?
2624     join('_', $relname, $join_count) : $relname);
2625
2626   return $alias;
2627 }
2628
2629 1;
2630
2631 =head1 USAGE NOTES
2632
2633 =head2 DBIx::Class and AutoCommit
2634
2635 DBIx::Class can do some wonderful magic with handling exceptions,
2636 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2637 (the default) combined with C<txn_do> for transaction support.
2638
2639 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2640 in an assumed transaction between commits, and you're telling us you'd
2641 like to manage that manually.  A lot of the magic protections offered by
2642 this module will go away.  We can't protect you from exceptions due to database
2643 disconnects because we don't know anything about how to restart your
2644 transactions.  You're on your own for handling all sorts of exceptional
2645 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2646 be with raw DBI.
2647
2648
2649 =head1 AUTHORS
2650
2651 Matt S. Trout <mst@shadowcatsystems.co.uk>
2652
2653 Andy Grundman <andy@hybridized.org>
2654
2655 =head1 LICENSE
2656
2657 You may distribute this code under the same terms as Perl itself.
2658
2659 =cut