use namespace::clean w/ Try::Tiny
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18 use Try::Tiny;
19 use File::Path ();
20 use namespace::clean;
21
22 __PACKAGE__->mk_group_accessors('simple' =>
23   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
24      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints
25      _server_info_hash/
26 );
27
28 # the values for these accessors are picked out (and deleted) from
29 # the attribute hashref passed to connect_info
30 my @storage_options = qw/
31   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
32   disable_sth_caching unsafe auto_savepoint
33 /;
34 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
35
36
37 # default cursor class, overridable in connect_info attributes
38 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
39
40 __PACKAGE__->mk_group_accessors('inherited' => qw/
41   sql_maker_class
42   _supports_insert_returning
43 /);
44 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
45
46 # Each of these methods need _determine_driver called before itself
47 # in order to function reliably. This is a purely DRY optimization
48 my @rdbms_specific_methods = qw/
49   deployment_statements
50   sqlt_type
51   sql_maker
52   build_datetime_parser
53   datetime_parser_type
54
55   insert
56   insert_bulk
57   update
58   delete
59   select
60   select_single
61 /;
62
63 for my $meth (@rdbms_specific_methods) {
64
65   my $orig = __PACKAGE__->can ($meth)
66     or next;
67
68   no strict qw/refs/;
69   no warnings qw/redefine/;
70   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
71     if (not $_[0]->_driver_determined) {
72       $_[0]->_determine_driver;
73       goto $_[0]->can($meth);
74     }
75     $orig->(@_);
76   };
77 }
78
79
80 =head1 NAME
81
82 DBIx::Class::Storage::DBI - DBI storage handler
83
84 =head1 SYNOPSIS
85
86   my $schema = MySchema->connect('dbi:SQLite:my.db');
87
88   $schema->storage->debug(1);
89
90   my @stuff = $schema->storage->dbh_do(
91     sub {
92       my ($storage, $dbh, @args) = @_;
93       $dbh->do("DROP TABLE authors");
94     },
95     @column_list
96   );
97
98   $schema->resultset('Book')->search({
99      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
100   });
101
102 =head1 DESCRIPTION
103
104 This class represents the connection to an RDBMS via L<DBI>.  See
105 L<DBIx::Class::Storage> for general information.  This pod only
106 documents DBI-specific methods and behaviors.
107
108 =head1 METHODS
109
110 =cut
111
112 sub new {
113   my $new = shift->next::method(@_);
114
115   $new->transaction_depth(0);
116   $new->_sql_maker_opts({});
117   $new->{savepoints} = [];
118   $new->{_in_dbh_do} = 0;
119   $new->{_dbh_gen} = 0;
120
121   # read below to see what this does
122   $new->_arm_global_destructor;
123
124   $new;
125 }
126
127 # This is hack to work around perl shooting stuff in random
128 # order on exit(). If we do not walk the remaining storage
129 # objects in an END block, there is a *small but real* chance
130 # of a fork()ed child to kill the parent's shared DBI handle,
131 # *before perl reaches the DESTROY in this package*
132 # Yes, it is ugly and effective.
133 {
134   my %seek_and_destroy;
135
136   sub _arm_global_destructor {
137     my $self = shift;
138     my $key = Scalar::Util::refaddr ($self);
139     $seek_and_destroy{$key} = $self;
140     Scalar::Util::weaken ($seek_and_destroy{$key});
141   }
142
143   END {
144     local $?; # just in case the DBI destructor changes it somehow
145
146     # destroy just the object if not native to this process/thread
147     $_->_preserve_foreign_dbh for (grep
148       { defined $_ }
149       values %seek_and_destroy
150     );
151   }
152 }
153
154 sub DESTROY {
155   my $self = shift;
156
157   # destroy just the object if not native to this process/thread
158   $self->_preserve_foreign_dbh;
159
160   # some databases need this to stop spewing warnings
161   if (my $dbh = $self->_dbh) {
162     try {
163       %{ $dbh->{CachedKids} } = ();
164       $dbh->disconnect;
165     };
166   }
167
168   $self->_dbh(undef);
169 }
170
171 sub _preserve_foreign_dbh {
172   my $self = shift;
173
174   return unless $self->_dbh;
175
176   $self->_verify_tid;
177
178   return unless $self->_dbh;
179
180   $self->_verify_pid;
181
182 }
183
184 # handle pid changes correctly - do not destroy parent's connection
185 sub _verify_pid {
186   my $self = shift;
187
188   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
189
190   $self->_dbh->{InactiveDestroy} = 1;
191   $self->_dbh(undef);
192   $self->{_dbh_gen}++;
193
194   return;
195 }
196
197 # very similar to above, but seems to FAIL if I set InactiveDestroy
198 sub _verify_tid {
199   my $self = shift;
200
201   if ( ! defined $self->_conn_tid ) {
202     return; # no threads
203   }
204   elsif ( $self->_conn_tid == threads->tid ) {
205     return; # same thread
206   }
207
208   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
209   $self->_dbh(undef);
210   $self->{_dbh_gen}++;
211
212   return;
213 }
214
215
216 =head2 connect_info
217
218 This method is normally called by L<DBIx::Class::Schema/connection>, which
219 encapsulates its argument list in an arrayref before passing them here.
220
221 The argument list may contain:
222
223 =over
224
225 =item *
226
227 The same 4-element argument set one would normally pass to
228 L<DBI/connect>, optionally followed by
229 L<extra attributes|/DBIx::Class specific connection attributes>
230 recognized by DBIx::Class:
231
232   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
233
234 =item *
235
236 A single code reference which returns a connected
237 L<DBI database handle|DBI/connect> optionally followed by
238 L<extra attributes|/DBIx::Class specific connection attributes> recognized
239 by DBIx::Class:
240
241   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
242
243 =item *
244
245 A single hashref with all the attributes and the dsn/user/password
246 mixed together:
247
248   $connect_info_args = [{
249     dsn => $dsn,
250     user => $user,
251     password => $pass,
252     %dbi_attributes,
253     %extra_attributes,
254   }];
255
256   $connect_info_args = [{
257     dbh_maker => sub { DBI->connect (...) },
258     %dbi_attributes,
259     %extra_attributes,
260   }];
261
262 This is particularly useful for L<Catalyst> based applications, allowing the
263 following config (L<Config::General> style):
264
265   <Model::DB>
266     schema_class   App::DB
267     <connect_info>
268       dsn          dbi:mysql:database=test
269       user         testuser
270       password     TestPass
271       AutoCommit   1
272     </connect_info>
273   </Model::DB>
274
275 The C<dsn>/C<user>/C<password> combination can be substituted by the
276 C<dbh_maker> key whose value is a coderef that returns a connected
277 L<DBI database handle|DBI/connect>
278
279 =back
280
281 Please note that the L<DBI> docs recommend that you always explicitly
282 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
283 recommends that it be set to I<1>, and that you perform transactions
284 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
285 to I<1> if you do not do explicitly set it to zero.  This is the default
286 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
287
288 =head3 DBIx::Class specific connection attributes
289
290 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
291 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
292 the following connection options. These options can be mixed in with your other
293 L<DBI> connection attributes, or placed in a separate hashref
294 (C<\%extra_attributes>) as shown above.
295
296 Every time C<connect_info> is invoked, any previous settings for
297 these options will be cleared before setting the new ones, regardless of
298 whether any options are specified in the new C<connect_info>.
299
300
301 =over
302
303 =item on_connect_do
304
305 Specifies things to do immediately after connecting or re-connecting to
306 the database.  Its value may contain:
307
308 =over
309
310 =item a scalar
311
312 This contains one SQL statement to execute.
313
314 =item an array reference
315
316 This contains SQL statements to execute in order.  Each element contains
317 a string or a code reference that returns a string.
318
319 =item a code reference
320
321 This contains some code to execute.  Unlike code references within an
322 array reference, its return value is ignored.
323
324 =back
325
326 =item on_disconnect_do
327
328 Takes arguments in the same form as L</on_connect_do> and executes them
329 immediately before disconnecting from the database.
330
331 Note, this only runs if you explicitly call L</disconnect> on the
332 storage object.
333
334 =item on_connect_call
335
336 A more generalized form of L</on_connect_do> that calls the specified
337 C<connect_call_METHOD> methods in your storage driver.
338
339   on_connect_do => 'select 1'
340
341 is equivalent to:
342
343   on_connect_call => [ [ do_sql => 'select 1' ] ]
344
345 Its values may contain:
346
347 =over
348
349 =item a scalar
350
351 Will call the C<connect_call_METHOD> method.
352
353 =item a code reference
354
355 Will execute C<< $code->($storage) >>
356
357 =item an array reference
358
359 Each value can be a method name or code reference.
360
361 =item an array of arrays
362
363 For each array, the first item is taken to be the C<connect_call_> method name
364 or code reference, and the rest are parameters to it.
365
366 =back
367
368 Some predefined storage methods you may use:
369
370 =over
371
372 =item do_sql
373
374 Executes a SQL string or a code reference that returns a SQL string. This is
375 what L</on_connect_do> and L</on_disconnect_do> use.
376
377 It can take:
378
379 =over
380
381 =item a scalar
382
383 Will execute the scalar as SQL.
384
385 =item an arrayref
386
387 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
388 attributes hashref and bind values.
389
390 =item a code reference
391
392 Will execute C<< $code->($storage) >> and execute the return array refs as
393 above.
394
395 =back
396
397 =item datetime_setup
398
399 Execute any statements necessary to initialize the database session to return
400 and accept datetime/timestamp values used with
401 L<DBIx::Class::InflateColumn::DateTime>.
402
403 Only necessary for some databases, see your specific storage driver for
404 implementation details.
405
406 =back
407
408 =item on_disconnect_call
409
410 Takes arguments in the same form as L</on_connect_call> and executes them
411 immediately before disconnecting from the database.
412
413 Calls the C<disconnect_call_METHOD> methods as opposed to the
414 C<connect_call_METHOD> methods called by L</on_connect_call>.
415
416 Note, this only runs if you explicitly call L</disconnect> on the
417 storage object.
418
419 =item disable_sth_caching
420
421 If set to a true value, this option will disable the caching of
422 statement handles via L<DBI/prepare_cached>.
423
424 =item limit_dialect
425
426 Sets the limit dialect. This is useful for JDBC-bridge among others
427 where the remote SQL-dialect cannot be determined by the name of the
428 driver alone. See also L<SQL::Abstract::Limit>.
429
430 =item quote_char
431
432 Specifies what characters to use to quote table and column names. If
433 you use this you will want to specify L</name_sep> as well.
434
435 C<quote_char> expects either a single character, in which case is it
436 is placed on either side of the table/column name, or an arrayref of length
437 2 in which case the table/column name is placed between the elements.
438
439 For example under MySQL you should use C<< quote_char => '`' >>, and for
440 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
441
442 =item name_sep
443
444 This only needs to be used in conjunction with C<quote_char>, and is used to
445 specify the character that separates elements (schemas, tables, columns) from
446 each other. In most cases this is simply a C<.>.
447
448 The consequences of not supplying this value is that L<SQL::Abstract>
449 will assume DBIx::Class' uses of aliases to be complete column
450 names. The output will look like I<"me.name"> when it should actually
451 be I<"me"."name">.
452
453 =item unsafe
454
455 This Storage driver normally installs its own C<HandleError>, sets
456 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
457 all database handles, including those supplied by a coderef.  It does this
458 so that it can have consistent and useful error behavior.
459
460 If you set this option to a true value, Storage will not do its usual
461 modifications to the database handle's attributes, and instead relies on
462 the settings in your connect_info DBI options (or the values you set in
463 your connection coderef, in the case that you are connecting via coderef).
464
465 Note that your custom settings can cause Storage to malfunction,
466 especially if you set a C<HandleError> handler that suppresses exceptions
467 and/or disable C<RaiseError>.
468
469 =item auto_savepoint
470
471 If this option is true, L<DBIx::Class> will use savepoints when nesting
472 transactions, making it possible to recover from failure in the inner
473 transaction without having to abort all outer transactions.
474
475 =item cursor_class
476
477 Use this argument to supply a cursor class other than the default
478 L<DBIx::Class::Storage::DBI::Cursor>.
479
480 =back
481
482 Some real-life examples of arguments to L</connect_info> and
483 L<DBIx::Class::Schema/connect>
484
485   # Simple SQLite connection
486   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
487
488   # Connect via subref
489   ->connect_info([ sub { DBI->connect(...) } ]);
490
491   # Connect via subref in hashref
492   ->connect_info([{
493     dbh_maker => sub { DBI->connect(...) },
494     on_connect_do => 'alter session ...',
495   }]);
496
497   # A bit more complicated
498   ->connect_info(
499     [
500       'dbi:Pg:dbname=foo',
501       'postgres',
502       'my_pg_password',
503       { AutoCommit => 1 },
504       { quote_char => q{"}, name_sep => q{.} },
505     ]
506   );
507
508   # Equivalent to the previous example
509   ->connect_info(
510     [
511       'dbi:Pg:dbname=foo',
512       'postgres',
513       'my_pg_password',
514       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
515     ]
516   );
517
518   # Same, but with hashref as argument
519   # See parse_connect_info for explanation
520   ->connect_info(
521     [{
522       dsn         => 'dbi:Pg:dbname=foo',
523       user        => 'postgres',
524       password    => 'my_pg_password',
525       AutoCommit  => 1,
526       quote_char  => q{"},
527       name_sep    => q{.},
528     }]
529   );
530
531   # Subref + DBIx::Class-specific connection options
532   ->connect_info(
533     [
534       sub { DBI->connect(...) },
535       {
536           quote_char => q{`},
537           name_sep => q{@},
538           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
539           disable_sth_caching => 1,
540       },
541     ]
542   );
543
544
545
546 =cut
547
548 sub connect_info {
549   my ($self, $info) = @_;
550
551   return $self->_connect_info if !$info;
552
553   $self->_connect_info($info); # copy for _connect_info
554
555   $info = $self->_normalize_connect_info($info)
556     if ref $info eq 'ARRAY';
557
558   for my $storage_opt (keys %{ $info->{storage_options} }) {
559     my $value = $info->{storage_options}{$storage_opt};
560
561     $self->$storage_opt($value);
562   }
563
564   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
565   #  the new set of options
566   $self->_sql_maker(undef);
567   $self->_sql_maker_opts({});
568
569   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
570     my $value = $info->{sql_maker_options}{$sql_maker_opt};
571
572     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
573   }
574
575   my %attrs = (
576     %{ $self->_default_dbi_connect_attributes || {} },
577     %{ $info->{attributes} || {} },
578   );
579
580   my @args = @{ $info->{arguments} };
581
582   $self->_dbi_connect_info([@args,
583     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
584
585   return $self->_connect_info;
586 }
587
588 sub _normalize_connect_info {
589   my ($self, $info_arg) = @_;
590   my %info;
591
592   my @args = @$info_arg;  # take a shallow copy for further mutilation
593
594   # combine/pre-parse arguments depending on invocation style
595
596   my %attrs;
597   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
598     %attrs = %{ $args[1] || {} };
599     @args = $args[0];
600   }
601   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
602     %attrs = %{$args[0]};
603     @args = ();
604     if (my $code = delete $attrs{dbh_maker}) {
605       @args = $code;
606
607       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
608       if (@ignored) {
609         carp sprintf (
610             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
611           . "to the result of 'dbh_maker'",
612
613           join (', ', map { "'$_'" } (@ignored) ),
614         );
615       }
616     }
617     else {
618       @args = delete @attrs{qw/dsn user password/};
619     }
620   }
621   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
622     %attrs = (
623       % { $args[3] || {} },
624       % { $args[4] || {} },
625     );
626     @args = @args[0,1,2];
627   }
628
629   $info{arguments} = \@args;
630
631   my @storage_opts = grep exists $attrs{$_},
632     @storage_options, 'cursor_class';
633
634   @{ $info{storage_options} }{@storage_opts} =
635     delete @attrs{@storage_opts} if @storage_opts;
636
637   my @sql_maker_opts = grep exists $attrs{$_},
638     qw/limit_dialect quote_char name_sep/;
639
640   @{ $info{sql_maker_options} }{@sql_maker_opts} =
641     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
642
643   $info{attributes} = \%attrs if %attrs;
644
645   return \%info;
646 }
647
648 sub _default_dbi_connect_attributes {
649   return {
650     AutoCommit => 1,
651     RaiseError => 1,
652     PrintError => 0,
653   };
654 }
655
656 =head2 on_connect_do
657
658 This method is deprecated in favour of setting via L</connect_info>.
659
660 =cut
661
662 =head2 on_disconnect_do
663
664 This method is deprecated in favour of setting via L</connect_info>.
665
666 =cut
667
668 sub _parse_connect_do {
669   my ($self, $type) = @_;
670
671   my $val = $self->$type;
672   return () if not defined $val;
673
674   my @res;
675
676   if (not ref($val)) {
677     push @res, [ 'do_sql', $val ];
678   } elsif (ref($val) eq 'CODE') {
679     push @res, $val;
680   } elsif (ref($val) eq 'ARRAY') {
681     push @res, map { [ 'do_sql', $_ ] } @$val;
682   } else {
683     $self->throw_exception("Invalid type for $type: ".ref($val));
684   }
685
686   return \@res;
687 }
688
689 =head2 dbh_do
690
691 Arguments: ($subref | $method_name), @extra_coderef_args?
692
693 Execute the given $subref or $method_name using the new exception-based
694 connection management.
695
696 The first two arguments will be the storage object that C<dbh_do> was called
697 on and a database handle to use.  Any additional arguments will be passed
698 verbatim to the called subref as arguments 2 and onwards.
699
700 Using this (instead of $self->_dbh or $self->dbh) ensures correct
701 exception handling and reconnection (or failover in future subclasses).
702
703 Your subref should have no side-effects outside of the database, as
704 there is the potential for your subref to be partially double-executed
705 if the database connection was stale/dysfunctional.
706
707 Example:
708
709   my @stuff = $schema->storage->dbh_do(
710     sub {
711       my ($storage, $dbh, @cols) = @_;
712       my $cols = join(q{, }, @cols);
713       $dbh->selectrow_array("SELECT $cols FROM foo");
714     },
715     @column_list
716   );
717
718 =cut
719
720 sub dbh_do {
721   my $self = shift;
722   my $code = shift;
723
724   my $dbh = $self->_get_dbh;
725
726   return $self->$code($dbh, @_)
727     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
728
729   local $self->{_in_dbh_do} = 1;
730
731   my @args = @_;
732   return try {
733     $self->$code ($dbh, @args);
734   } catch {
735     $self->throw_exception($_) if $self->connected;
736
737     # We were not connected - reconnect and retry, but let any
738     #  exception fall right through this time
739     carp "Retrying $code after catching disconnected exception: $_"
740       if $ENV{DBIC_DBIRETRY_DEBUG};
741
742     $self->_populate_dbh;
743     $self->$code($self->_dbh, @args);
744   };
745 }
746
747 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
748 # It also informs dbh_do to bypass itself while under the direction of txn_do,
749 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
750 sub txn_do {
751   my $self = shift;
752   my $coderef = shift;
753
754   ref $coderef eq 'CODE' or $self->throw_exception
755     ('$coderef must be a CODE reference');
756
757   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
758
759   local $self->{_in_dbh_do} = 1;
760
761   my @result;
762   my $want_array = wantarray;
763
764   my $tried = 0;
765   while(1) {
766     my $exception;
767     my @args = @_;
768     try {
769       $self->_get_dbh;
770
771       $self->txn_begin;
772       if($want_array) {
773           @result = $coderef->(@args);
774       }
775       elsif(defined $want_array) {
776           $result[0] = $coderef->(@args);
777       }
778       else {
779           $coderef->(@args);
780       }
781       $self->txn_commit;
782     } catch {
783       $exception = $_;
784     };
785
786     if(! defined $exception) { return $want_array ? @result : $result[0] }
787
788     if($tried++ || $self->connected) {
789       my $rollback_exception;
790       try { $self->txn_rollback } catch { $rollback_exception = shift };
791       if(defined $rollback_exception) {
792         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
793         $self->throw_exception($exception)  # propagate nested rollback
794           if $rollback_exception =~ /$exception_class/;
795
796         $self->throw_exception(
797           "Transaction aborted: ${exception}. "
798           . "Rollback failed: ${rollback_exception}"
799         );
800       }
801       $self->throw_exception($exception)
802     }
803
804     # We were not connected, and was first try - reconnect and retry
805     # via the while loop
806     carp "Retrying $coderef after catching disconnected exception: $exception"
807       if $ENV{DBIC_DBIRETRY_DEBUG};
808     $self->_populate_dbh;
809   }
810 }
811
812 =head2 disconnect
813
814 Our C<disconnect> method also performs a rollback first if the
815 database is not in C<AutoCommit> mode.
816
817 =cut
818
819 sub disconnect {
820   my ($self) = @_;
821
822   if( $self->_dbh ) {
823     my @actions;
824
825     push @actions, ( $self->on_disconnect_call || () );
826     push @actions, $self->_parse_connect_do ('on_disconnect_do');
827
828     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
829
830     $self->_dbh_rollback unless $self->_dbh_autocommit;
831
832     %{ $self->_dbh->{CachedKids} } = ();
833     $self->_dbh->disconnect;
834     $self->_dbh(undef);
835     $self->{_dbh_gen}++;
836   }
837 }
838
839 =head2 with_deferred_fk_checks
840
841 =over 4
842
843 =item Arguments: C<$coderef>
844
845 =item Return Value: The return value of $coderef
846
847 =back
848
849 Storage specific method to run the code ref with FK checks deferred or
850 in MySQL's case disabled entirely.
851
852 =cut
853
854 # Storage subclasses should override this
855 sub with_deferred_fk_checks {
856   my ($self, $sub) = @_;
857   $sub->();
858 }
859
860 =head2 connected
861
862 =over
863
864 =item Arguments: none
865
866 =item Return Value: 1|0
867
868 =back
869
870 Verifies that the current database handle is active and ready to execute
871 an SQL statement (e.g. the connection did not get stale, server is still
872 answering, etc.) This method is used internally by L</dbh>.
873
874 =cut
875
876 sub connected {
877   my $self = shift;
878   return 0 unless $self->_seems_connected;
879
880   #be on the safe side
881   local $self->_dbh->{RaiseError} = 1;
882
883   return $self->_ping;
884 }
885
886 sub _seems_connected {
887   my $self = shift;
888
889   $self->_preserve_foreign_dbh;
890
891   my $dbh = $self->_dbh
892     or return 0;
893
894   return $dbh->FETCH('Active');
895 }
896
897 sub _ping {
898   my $self = shift;
899
900   my $dbh = $self->_dbh or return 0;
901
902   return $dbh->ping;
903 }
904
905 sub ensure_connected {
906   my ($self) = @_;
907
908   unless ($self->connected) {
909     $self->_populate_dbh;
910   }
911 }
912
913 =head2 dbh
914
915 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
916 is guaranteed to be healthy by implicitly calling L</connected>, and if
917 necessary performing a reconnection before returning. Keep in mind that this
918 is very B<expensive> on some database engines. Consider using L</dbh_do>
919 instead.
920
921 =cut
922
923 sub dbh {
924   my ($self) = @_;
925
926   if (not $self->_dbh) {
927     $self->_populate_dbh;
928   } else {
929     $self->ensure_connected;
930   }
931   return $self->_dbh;
932 }
933
934 # this is the internal "get dbh or connect (don't check)" method
935 sub _get_dbh {
936   my $self = shift;
937   $self->_preserve_foreign_dbh;
938   $self->_populate_dbh unless $self->_dbh;
939   return $self->_dbh;
940 }
941
942 sub _sql_maker_args {
943     my ($self) = @_;
944
945     return (
946       bindtype=>'columns',
947       array_datatypes => 1,
948       limit_dialect => $self->_get_dbh,
949       %{$self->_sql_maker_opts}
950     );
951 }
952
953 sub sql_maker {
954   my ($self) = @_;
955   unless ($self->_sql_maker) {
956     my $sql_maker_class = $self->sql_maker_class;
957     $self->ensure_class_loaded ($sql_maker_class);
958     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
959   }
960   return $self->_sql_maker;
961 }
962
963 # nothing to do by default
964 sub _rebless {}
965 sub _init {}
966
967 sub _populate_dbh {
968   my ($self) = @_;
969
970   my @info = @{$self->_dbi_connect_info || []};
971   $self->_dbh(undef); # in case ->connected failed we might get sent here
972   $self->_server_info_hash (undef);
973   $self->_dbh($self->_connect(@info));
974
975   $self->_conn_pid($$);
976   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
977
978   $self->_determine_driver;
979
980   # Always set the transaction depth on connect, since
981   #  there is no transaction in progress by definition
982   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
983
984   $self->_run_connection_actions unless $self->{_in_determine_driver};
985 }
986
987 sub _run_connection_actions {
988   my $self = shift;
989   my @actions;
990
991   push @actions, ( $self->on_connect_call || () );
992   push @actions, $self->_parse_connect_do ('on_connect_do');
993
994   $self->_do_connection_actions(connect_call_ => $_) for @actions;
995 }
996
997 sub _server_info {
998   my $self = shift;
999
1000   unless ($self->_server_info_hash) {
1001
1002     my %info;
1003
1004     my $server_version = try { $self->_get_server_version };
1005
1006     if (defined $server_version) {
1007       $info{dbms_version} = $server_version;
1008
1009       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1010       my @verparts = split (/\./, $numeric_version);
1011       if (
1012         @verparts
1013           &&
1014         $verparts[0] <= 999
1015       ) {
1016         # consider only up to 3 version parts, iff not more than 3 digits
1017         my @use_parts;
1018         while (@verparts && @use_parts < 3) {
1019           my $p = shift @verparts;
1020           last if $p > 999;
1021           push @use_parts, $p;
1022         }
1023         push @use_parts, 0 while @use_parts < 3;
1024
1025         $info{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1026       }
1027     }
1028
1029     $self->_server_info_hash(\%info);
1030   }
1031
1032   return $self->_server_info_hash
1033 }
1034
1035 sub _get_server_version {
1036   shift->_get_dbh->get_info(18);
1037 }
1038
1039 sub _determine_driver {
1040   my ($self) = @_;
1041
1042   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1043     my $started_connected = 0;
1044     local $self->{_in_determine_driver} = 1;
1045
1046     if (ref($self) eq __PACKAGE__) {
1047       my $driver;
1048       if ($self->_dbh) { # we are connected
1049         $driver = $self->_dbh->{Driver}{Name};
1050         $started_connected = 1;
1051       } else {
1052         # if connect_info is a CODEREF, we have no choice but to connect
1053         if (ref $self->_dbi_connect_info->[0] &&
1054             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
1055           $self->_populate_dbh;
1056           $driver = $self->_dbh->{Driver}{Name};
1057         }
1058         else {
1059           # try to use dsn to not require being connected, the driver may still
1060           # force a connection in _rebless to determine version
1061           # (dsn may not be supplied at all if all we do is make a mock-schema)
1062           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1063           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1064           $driver ||= $ENV{DBI_DRIVER};
1065         }
1066       }
1067
1068       if ($driver) {
1069         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1070         if ($self->load_optional_class($storage_class)) {
1071           mro::set_mro($storage_class, 'c3');
1072           bless $self, $storage_class;
1073           $self->_rebless();
1074         }
1075       }
1076     }
1077
1078     $self->_driver_determined(1);
1079
1080     $self->_init; # run driver-specific initializations
1081
1082     $self->_run_connection_actions
1083         if !$started_connected && defined $self->_dbh;
1084   }
1085 }
1086
1087 sub _do_connection_actions {
1088   my $self          = shift;
1089   my $method_prefix = shift;
1090   my $call          = shift;
1091
1092   if (not ref($call)) {
1093     my $method = $method_prefix . $call;
1094     $self->$method(@_);
1095   } elsif (ref($call) eq 'CODE') {
1096     $self->$call(@_);
1097   } elsif (ref($call) eq 'ARRAY') {
1098     if (ref($call->[0]) ne 'ARRAY') {
1099       $self->_do_connection_actions($method_prefix, $_) for @$call;
1100     } else {
1101       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1102     }
1103   } else {
1104     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1105   }
1106
1107   return $self;
1108 }
1109
1110 sub connect_call_do_sql {
1111   my $self = shift;
1112   $self->_do_query(@_);
1113 }
1114
1115 sub disconnect_call_do_sql {
1116   my $self = shift;
1117   $self->_do_query(@_);
1118 }
1119
1120 # override in db-specific backend when necessary
1121 sub connect_call_datetime_setup { 1 }
1122
1123 sub _do_query {
1124   my ($self, $action) = @_;
1125
1126   if (ref $action eq 'CODE') {
1127     $action = $action->($self);
1128     $self->_do_query($_) foreach @$action;
1129   }
1130   else {
1131     # Most debuggers expect ($sql, @bind), so we need to exclude
1132     # the attribute hash which is the second argument to $dbh->do
1133     # furthermore the bind values are usually to be presented
1134     # as named arrayref pairs, so wrap those here too
1135     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1136     my $sql = shift @do_args;
1137     my $attrs = shift @do_args;
1138     my @bind = map { [ undef, $_ ] } @do_args;
1139
1140     $self->_query_start($sql, @bind);
1141     $self->_get_dbh->do($sql, $attrs, @do_args);
1142     $self->_query_end($sql, @bind);
1143   }
1144
1145   return $self;
1146 }
1147
1148 sub _connect {
1149   my ($self, @info) = @_;
1150
1151   $self->throw_exception("You failed to provide any connection info")
1152     if !@info;
1153
1154   my ($old_connect_via, $dbh);
1155
1156   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1157     $old_connect_via = $DBI::connect_via;
1158     $DBI::connect_via = 'connect';
1159   }
1160
1161   try {
1162     if(ref $info[0] eq 'CODE') {
1163        $dbh = $info[0]->();
1164     }
1165     else {
1166        $dbh = DBI->connect(@info);
1167     }
1168
1169     if (!$dbh) {
1170       die $DBI::errstr;
1171     }
1172
1173     unless ($self->unsafe) {
1174       my $weak_self = $self;
1175       Scalar::Util::weaken($weak_self);
1176       $dbh->{HandleError} = sub {
1177           if ($weak_self) {
1178             $weak_self->throw_exception("DBI Exception: $_[0]");
1179           }
1180           else {
1181             # the handler may be invoked by something totally out of
1182             # the scope of DBIC
1183             croak ("DBI Exception: $_[0]");
1184           }
1185       };
1186       $dbh->{ShowErrorStatement} = 1;
1187       $dbh->{RaiseError} = 1;
1188       $dbh->{PrintError} = 0;
1189     }
1190   }
1191   catch {
1192     $self->throw_exception("DBI Connection failed: $_")
1193   }
1194   finally {
1195     $DBI::connect_via = $old_connect_via if $old_connect_via;
1196   };
1197
1198   $self->_dbh_autocommit($dbh->{AutoCommit});
1199   $dbh;
1200 }
1201
1202 sub svp_begin {
1203   my ($self, $name) = @_;
1204
1205   $name = $self->_svp_generate_name
1206     unless defined $name;
1207
1208   $self->throw_exception ("You can't use savepoints outside a transaction")
1209     if $self->{transaction_depth} == 0;
1210
1211   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1212     unless $self->can('_svp_begin');
1213
1214   push @{ $self->{savepoints} }, $name;
1215
1216   $self->debugobj->svp_begin($name) if $self->debug;
1217
1218   return $self->_svp_begin($name);
1219 }
1220
1221 sub svp_release {
1222   my ($self, $name) = @_;
1223
1224   $self->throw_exception ("You can't use savepoints outside a transaction")
1225     if $self->{transaction_depth} == 0;
1226
1227   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1228     unless $self->can('_svp_release');
1229
1230   if (defined $name) {
1231     $self->throw_exception ("Savepoint '$name' does not exist")
1232       unless grep { $_ eq $name } @{ $self->{savepoints} };
1233
1234     # Dig through the stack until we find the one we are releasing.  This keeps
1235     # the stack up to date.
1236     my $svp;
1237
1238     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1239   } else {
1240     $name = pop @{ $self->{savepoints} };
1241   }
1242
1243   $self->debugobj->svp_release($name) if $self->debug;
1244
1245   return $self->_svp_release($name);
1246 }
1247
1248 sub svp_rollback {
1249   my ($self, $name) = @_;
1250
1251   $self->throw_exception ("You can't use savepoints outside a transaction")
1252     if $self->{transaction_depth} == 0;
1253
1254   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1255     unless $self->can('_svp_rollback');
1256
1257   if (defined $name) {
1258       # If they passed us a name, verify that it exists in the stack
1259       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1260           $self->throw_exception("Savepoint '$name' does not exist!");
1261       }
1262
1263       # Dig through the stack until we find the one we are releasing.  This keeps
1264       # the stack up to date.
1265       while(my $s = pop(@{ $self->{savepoints} })) {
1266           last if($s eq $name);
1267       }
1268       # Add the savepoint back to the stack, as a rollback doesn't remove the
1269       # named savepoint, only everything after it.
1270       push(@{ $self->{savepoints} }, $name);
1271   } else {
1272       # We'll assume they want to rollback to the last savepoint
1273       $name = $self->{savepoints}->[-1];
1274   }
1275
1276   $self->debugobj->svp_rollback($name) if $self->debug;
1277
1278   return $self->_svp_rollback($name);
1279 }
1280
1281 sub _svp_generate_name {
1282     my ($self) = @_;
1283
1284     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1285 }
1286
1287 sub txn_begin {
1288   my $self = shift;
1289
1290   # this means we have not yet connected and do not know the AC status
1291   # (e.g. coderef $dbh)
1292   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1293
1294   if($self->{transaction_depth} == 0) {
1295     $self->debugobj->txn_begin()
1296       if $self->debug;
1297     $self->_dbh_begin_work;
1298   }
1299   elsif ($self->auto_savepoint) {
1300     $self->svp_begin;
1301   }
1302   $self->{transaction_depth}++;
1303 }
1304
1305 sub _dbh_begin_work {
1306   my $self = shift;
1307
1308   # if the user is utilizing txn_do - good for him, otherwise we need to
1309   # ensure that the $dbh is healthy on BEGIN.
1310   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1311   # will be replaced by a failure of begin_work itself (which will be
1312   # then retried on reconnect)
1313   if ($self->{_in_dbh_do}) {
1314     $self->_dbh->begin_work;
1315   } else {
1316     $self->dbh_do(sub { $_[1]->begin_work });
1317   }
1318 }
1319
1320 sub txn_commit {
1321   my $self = shift;
1322   if ($self->{transaction_depth} == 1) {
1323     $self->debugobj->txn_commit()
1324       if ($self->debug);
1325     $self->_dbh_commit;
1326     $self->{transaction_depth} = 0
1327       if $self->_dbh_autocommit;
1328   }
1329   elsif($self->{transaction_depth} > 1) {
1330     $self->{transaction_depth}--;
1331     $self->svp_release
1332       if $self->auto_savepoint;
1333   }
1334 }
1335
1336 sub _dbh_commit {
1337   my $self = shift;
1338   my $dbh  = $self->_dbh
1339     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1340   $dbh->commit;
1341 }
1342
1343 sub txn_rollback {
1344   my $self = shift;
1345   my $dbh = $self->_dbh;
1346   try {
1347     if ($self->{transaction_depth} == 1) {
1348       $self->debugobj->txn_rollback()
1349         if ($self->debug);
1350       $self->{transaction_depth} = 0
1351         if $self->_dbh_autocommit;
1352       $self->_dbh_rollback;
1353     }
1354     elsif($self->{transaction_depth} > 1) {
1355       $self->{transaction_depth}--;
1356       if ($self->auto_savepoint) {
1357         $self->svp_rollback;
1358         $self->svp_release;
1359       }
1360     }
1361     else {
1362       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1363     }
1364   }
1365   catch {
1366     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1367
1368     if ($_ !~ /$exception_class/) {
1369       # ensure that a failed rollback resets the transaction depth
1370       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1371     }
1372
1373     $self->throw_exception($_)
1374   };
1375 }
1376
1377 sub _dbh_rollback {
1378   my $self = shift;
1379   my $dbh  = $self->_dbh
1380     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1381   $dbh->rollback;
1382 }
1383
1384 # This used to be the top-half of _execute.  It was split out to make it
1385 #  easier to override in NoBindVars without duping the rest.  It takes up
1386 #  all of _execute's args, and emits $sql, @bind.
1387 sub _prep_for_execute {
1388   my ($self, $op, $extra_bind, $ident, $args) = @_;
1389
1390   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1391     $ident = $ident->from();
1392   }
1393
1394   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1395
1396   unshift(@bind,
1397     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1398       if $extra_bind;
1399   return ($sql, \@bind);
1400 }
1401
1402
1403 sub _fix_bind_params {
1404     my ($self, @bind) = @_;
1405
1406     ### Turn @bind from something like this:
1407     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1408     ### to this:
1409     ###   ( "'1'", "'1'", "'3'" )
1410     return
1411         map {
1412             if ( defined( $_ && $_->[1] ) ) {
1413                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1414             }
1415             else { q{'NULL'}; }
1416         } @bind;
1417 }
1418
1419 sub _query_start {
1420     my ( $self, $sql, @bind ) = @_;
1421
1422     if ( $self->debug ) {
1423         @bind = $self->_fix_bind_params(@bind);
1424
1425         $self->debugobj->query_start( $sql, @bind );
1426     }
1427 }
1428
1429 sub _query_end {
1430     my ( $self, $sql, @bind ) = @_;
1431
1432     if ( $self->debug ) {
1433         @bind = $self->_fix_bind_params(@bind);
1434         $self->debugobj->query_end( $sql, @bind );
1435     }
1436 }
1437
1438 sub _dbh_execute {
1439   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1440
1441   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1442
1443   $self->_query_start( $sql, @$bind );
1444
1445   my $sth = $self->sth($sql,$op);
1446
1447   my $placeholder_index = 1;
1448
1449   foreach my $bound (@$bind) {
1450     my $attributes = {};
1451     my($column_name, @data) = @$bound;
1452
1453     if ($bind_attributes) {
1454       $attributes = $bind_attributes->{$column_name}
1455       if defined $bind_attributes->{$column_name};
1456     }
1457
1458     foreach my $data (@data) {
1459       my $ref = ref $data;
1460       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1461
1462       $sth->bind_param($placeholder_index, $data, $attributes);
1463       $placeholder_index++;
1464     }
1465   }
1466
1467   # Can this fail without throwing an exception anyways???
1468   my $rv = $sth->execute();
1469   $self->throw_exception($sth->errstr) if !$rv;
1470
1471   $self->_query_end( $sql, @$bind );
1472
1473   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1474 }
1475
1476 sub _execute {
1477     my $self = shift;
1478     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1479 }
1480
1481 sub _prefetch_insert_auto_nextvals {
1482   my ($self, $source, $to_insert) = @_;
1483
1484   my $upd = {};
1485
1486   foreach my $col ( $source->columns ) {
1487     if ( !defined $to_insert->{$col} ) {
1488       my $col_info = $source->column_info($col);
1489
1490       if ( $col_info->{auto_nextval} ) {
1491         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1492           'nextval',
1493           $col_info->{sequence} ||=
1494             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1495         );
1496       }
1497     }
1498   }
1499
1500   return $upd;
1501 }
1502
1503 sub insert {
1504   my $self = shift;
1505   my ($source, $to_insert, $opts) = @_;
1506
1507   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1508
1509   my $bind_attributes = $self->source_bind_attributes($source);
1510
1511   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1512
1513   if ($opts->{returning}) {
1514     my @ret_cols = @{$opts->{returning}};
1515
1516     my @ret_vals = try {
1517       local $SIG{__WARN__} = sub {};
1518       my @r = $sth->fetchrow_array;
1519       $sth->finish;
1520       @r;
1521     };
1522
1523     my %ret;
1524     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1525
1526     $updated_cols = {
1527       %$updated_cols,
1528       %ret,
1529     };
1530   }
1531
1532   return $updated_cols;
1533 }
1534
1535 ## Currently it is assumed that all values passed will be "normal", i.e. not
1536 ## scalar refs, or at least, all the same type as the first set, the statement is
1537 ## only prepped once.
1538 sub insert_bulk {
1539   my ($self, $source, $cols, $data) = @_;
1540
1541   my %colvalues;
1542   @colvalues{@$cols} = (0..$#$cols);
1543
1544   for my $i (0..$#$cols) {
1545     my $first_val = $data->[0][$i];
1546     next unless ref $first_val eq 'SCALAR';
1547
1548     $colvalues{ $cols->[$i] } = $first_val;
1549   }
1550
1551   # check for bad data and stringify stringifiable objects
1552   my $bad_slice = sub {
1553     my ($msg, $col_idx, $slice_idx) = @_;
1554     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1555       $msg,
1556       $cols->[$col_idx],
1557       do {
1558         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1559         Data::Dumper::Concise::Dumper({
1560           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1561         }),
1562       }
1563     );
1564   };
1565
1566   for my $datum_idx (0..$#$data) {
1567     my $datum = $data->[$datum_idx];
1568
1569     for my $col_idx (0..$#$cols) {
1570       my $val            = $datum->[$col_idx];
1571       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1572       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1573
1574       if ($is_literal_sql) {
1575         if (not ref $val) {
1576           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1577         }
1578         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1579           $bad_slice->("$reftype reference found where literal SQL expected",
1580             $col_idx, $datum_idx);
1581         }
1582         elsif ($$val ne $$sqla_bind){
1583           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1584             $col_idx, $datum_idx);
1585         }
1586       }
1587       elsif (my $reftype = ref $val) {
1588         require overload;
1589         if (overload::Method($val, '""')) {
1590           $datum->[$col_idx] = "".$val;
1591         }
1592         else {
1593           $bad_slice->("$reftype reference found where bind expected",
1594             $col_idx, $datum_idx);
1595         }
1596       }
1597     }
1598   }
1599
1600   my ($sql, $bind) = $self->_prep_for_execute (
1601     'insert', undef, $source, [\%colvalues]
1602   );
1603   my @bind = @$bind;
1604
1605   my $empty_bind = 1 if (not @bind) &&
1606     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1607
1608   if ((not @bind) && (not $empty_bind)) {
1609     $self->throw_exception(
1610       'Cannot insert_bulk without support for placeholders'
1611     );
1612   }
1613
1614   # neither _execute_array, nor _execute_inserts_with_no_binds are
1615   # atomic (even if _execute _array is a single call). Thus a safety
1616   # scope guard
1617   my $guard = $self->txn_scope_guard;
1618
1619   $self->_query_start( $sql, ['__BULK__'] );
1620   my $sth = $self->sth($sql);
1621   my $rv = do {
1622     if ($empty_bind) {
1623       # bind_param_array doesn't work if there are no binds
1624       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1625     }
1626     else {
1627 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1628       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1629     }
1630   };
1631
1632   $self->_query_end( $sql, ['__BULK__'] );
1633
1634   $guard->commit;
1635
1636   return (wantarray ? ($rv, $sth, @bind) : $rv);
1637 }
1638
1639 sub _execute_array {
1640   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1641
1642   ## This must be an arrayref, else nothing works!
1643   my $tuple_status = [];
1644
1645   ## Get the bind_attributes, if any exist
1646   my $bind_attributes = $self->source_bind_attributes($source);
1647
1648   ## Bind the values and execute
1649   my $placeholder_index = 1;
1650
1651   foreach my $bound (@$bind) {
1652
1653     my $attributes = {};
1654     my ($column_name, $data_index) = @$bound;
1655
1656     if( $bind_attributes ) {
1657       $attributes = $bind_attributes->{$column_name}
1658       if defined $bind_attributes->{$column_name};
1659     }
1660
1661     my @data = map { $_->[$data_index] } @$data;
1662
1663     $sth->bind_param_array(
1664       $placeholder_index,
1665       [@data],
1666       (%$attributes ?  $attributes : ()),
1667     );
1668     $placeholder_index++;
1669   }
1670
1671   my ($rv, $err);
1672   try {
1673     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1674   }
1675   catch {
1676     $err = shift;
1677   }
1678   finally {
1679     # Statement must finish even if there was an exception.
1680     try {
1681       $sth->finish
1682     }
1683     catch {
1684       $err = shift unless defined $err
1685     };
1686   };
1687
1688   $err = $sth->errstr
1689     if (! defined $err and $sth->err);
1690
1691   if (defined $err) {
1692     my $i = 0;
1693     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1694
1695     $self->throw_exception("Unexpected populate error: $err")
1696       if ($i > $#$tuple_status);
1697
1698     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1699       ($tuple_status->[$i][1] || $err),
1700       Data::Dumper::Concise::Dumper({
1701         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1702       }),
1703     );
1704   }
1705
1706   return $rv;
1707 }
1708
1709 sub _dbh_execute_array {
1710     my ($self, $sth, $tuple_status, @extra) = @_;
1711
1712     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1713 }
1714
1715 sub _dbh_execute_inserts_with_no_binds {
1716   my ($self, $sth, $count) = @_;
1717
1718   my $err;
1719   try {
1720     my $dbh = $self->_get_dbh;
1721     local $dbh->{RaiseError} = 1;
1722     local $dbh->{PrintError} = 0;
1723
1724     $sth->execute foreach 1..$count;
1725   }
1726   catch {
1727     $err = shift;
1728   }
1729   finally {
1730     # Make sure statement is finished even if there was an exception.
1731     try {
1732       $sth->finish
1733     }
1734     catch {
1735       $err = shift unless defined $err;
1736     };
1737   };
1738
1739   $self->throw_exception($err) if defined $err;
1740
1741   return $count;
1742 }
1743
1744 sub update {
1745   my ($self, $source, @args) = @_;
1746
1747   my $bind_attrs = $self->source_bind_attributes($source);
1748
1749   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1750 }
1751
1752
1753 sub delete {
1754   my ($self, $source, @args) = @_;
1755
1756   my $bind_attrs = $self->source_bind_attributes($source);
1757
1758   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1759 }
1760
1761 # We were sent here because the $rs contains a complex search
1762 # which will require a subquery to select the correct rows
1763 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1764 #
1765 # Generating a single PK column subquery is trivial and supported
1766 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1767 # Look at _multipk_update_delete()
1768 sub _subq_update_delete {
1769   my $self = shift;
1770   my ($rs, $op, $values) = @_;
1771
1772   my $rsrc = $rs->result_source;
1773
1774   # quick check if we got a sane rs on our hands
1775   my @pcols = $rsrc->_pri_cols;
1776
1777   my $sel = $rs->_resolved_attrs->{select};
1778   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1779
1780   if (
1781       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1782         ne
1783       join ("\x00", sort @$sel )
1784   ) {
1785     $self->throw_exception (
1786       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1787     );
1788   }
1789
1790   if (@pcols == 1) {
1791     return $self->$op (
1792       $rsrc,
1793       $op eq 'update' ? $values : (),
1794       { $pcols[0] => { -in => $rs->as_query } },
1795     );
1796   }
1797
1798   else {
1799     return $self->_multipk_update_delete (@_);
1800   }
1801 }
1802
1803 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1804 # resultset update/delete involving subqueries. So by default resort
1805 # to simple (and inefficient) delete_all style per-row opearations,
1806 # while allowing specific storages to override this with a faster
1807 # implementation.
1808 #
1809 sub _multipk_update_delete {
1810   return shift->_per_row_update_delete (@_);
1811 }
1812
1813 # This is the default loop used to delete/update rows for multi PK
1814 # resultsets, and used by mysql exclusively (because it can't do anything
1815 # else).
1816 #
1817 # We do not use $row->$op style queries, because resultset update/delete
1818 # is not expected to cascade (this is what delete_all/update_all is for).
1819 #
1820 # There should be no race conditions as the entire operation is rolled
1821 # in a transaction.
1822 #
1823 sub _per_row_update_delete {
1824   my $self = shift;
1825   my ($rs, $op, $values) = @_;
1826
1827   my $rsrc = $rs->result_source;
1828   my @pcols = $rsrc->_pri_cols;
1829
1830   my $guard = $self->txn_scope_guard;
1831
1832   # emulate the return value of $sth->execute for non-selects
1833   my $row_cnt = '0E0';
1834
1835   my $subrs_cur = $rs->cursor;
1836   my @all_pk = $subrs_cur->all;
1837   for my $pks ( @all_pk) {
1838
1839     my $cond;
1840     for my $i (0.. $#pcols) {
1841       $cond->{$pcols[$i]} = $pks->[$i];
1842     }
1843
1844     $self->$op (
1845       $rsrc,
1846       $op eq 'update' ? $values : (),
1847       $cond,
1848     );
1849
1850     $row_cnt++;
1851   }
1852
1853   $guard->commit;
1854
1855   return $row_cnt;
1856 }
1857
1858 sub _select {
1859   my $self = shift;
1860   $self->_execute($self->_select_args(@_));
1861 }
1862
1863 sub _select_args_to_query {
1864   my $self = shift;
1865
1866   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1867   #  = $self->_select_args($ident, $select, $cond, $attrs);
1868   my ($op, $bind, $ident, $bind_attrs, @args) =
1869     $self->_select_args(@_);
1870
1871   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1872   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1873   $prepared_bind ||= [];
1874
1875   return wantarray
1876     ? ($sql, $prepared_bind, $bind_attrs)
1877     : \[ "($sql)", @$prepared_bind ]
1878   ;
1879 }
1880
1881 sub _select_args {
1882   my ($self, $ident, $select, $where, $attrs) = @_;
1883
1884   my $sql_maker = $self->sql_maker;
1885   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1886
1887   $attrs = {
1888     %$attrs,
1889     select => $select,
1890     from => $ident,
1891     where => $where,
1892     $rs_alias && $alias2source->{$rs_alias}
1893       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1894       : ()
1895     ,
1896   };
1897
1898   # calculate bind_attrs before possible $ident mangling
1899   my $bind_attrs = {};
1900   for my $alias (keys %$alias2source) {
1901     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1902     for my $col (keys %$bindtypes) {
1903
1904       my $fqcn = join ('.', $alias, $col);
1905       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1906
1907       # Unqialified column names are nice, but at the same time can be
1908       # rather ambiguous. What we do here is basically go along with
1909       # the loop, adding an unqualified column slot to $bind_attrs,
1910       # alongside the fully qualified name. As soon as we encounter
1911       # another column by that name (which would imply another table)
1912       # we unset the unqualified slot and never add any info to it
1913       # to avoid erroneous type binding. If this happens the users
1914       # only choice will be to fully qualify his column name
1915
1916       if (exists $bind_attrs->{$col}) {
1917         $bind_attrs->{$col} = {};
1918       }
1919       else {
1920         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1921       }
1922     }
1923   }
1924
1925   # adjust limits
1926   if (defined $attrs->{rows}) {
1927     $self->throw_exception("rows attribute must be positive if present")
1928       unless $attrs->{rows} > 0;
1929   }
1930   elsif (defined $attrs->{offset}) {
1931     # MySQL actually recommends this approach.  I cringe.
1932     $attrs->{rows} = 2**32;
1933   }
1934
1935   my @limit;
1936
1937   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1938   # storage, unless software limit was requested
1939   if (
1940     #limited has_many
1941     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1942        ||
1943     # grouped prefetch (to satisfy group_by == select)
1944     ( $attrs->{group_by}
1945         &&
1946       @{$attrs->{group_by}}
1947         &&
1948       $attrs->{_prefetch_select}
1949         &&
1950       @{$attrs->{_prefetch_select}}
1951     )
1952   ) {
1953     ($ident, $select, $where, $attrs)
1954       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1955   }
1956   elsif (! $attrs->{software_limit} ) {
1957     push @limit, $attrs->{rows}, $attrs->{offset};
1958   }
1959
1960   # try to simplify the joinmap further (prune unreferenced type-single joins)
1961   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1962
1963 ###
1964   # This would be the point to deflate anything found in $where
1965   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1966   # expect a row object. And all we have is a resultsource (it is trivial
1967   # to extract deflator coderefs via $alias2source above).
1968   #
1969   # I don't see a way forward other than changing the way deflators are
1970   # invoked, and that's just bad...
1971 ###
1972
1973   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
1974 }
1975
1976 # Returns a counting SELECT for a simple count
1977 # query. Abstracted so that a storage could override
1978 # this to { count => 'firstcol' } or whatever makes
1979 # sense as a performance optimization
1980 sub _count_select {
1981   #my ($self, $source, $rs_attrs) = @_;
1982   return { count => '*' };
1983 }
1984
1985
1986 sub source_bind_attributes {
1987   my ($self, $source) = @_;
1988
1989   my $bind_attributes;
1990   foreach my $column ($source->columns) {
1991
1992     my $data_type = $source->column_info($column)->{data_type} || '';
1993     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1994      if $data_type;
1995   }
1996
1997   return $bind_attributes;
1998 }
1999
2000 =head2 select
2001
2002 =over 4
2003
2004 =item Arguments: $ident, $select, $condition, $attrs
2005
2006 =back
2007
2008 Handle a SQL select statement.
2009
2010 =cut
2011
2012 sub select {
2013   my $self = shift;
2014   my ($ident, $select, $condition, $attrs) = @_;
2015   return $self->cursor_class->new($self, \@_, $attrs);
2016 }
2017
2018 sub select_single {
2019   my $self = shift;
2020   my ($rv, $sth, @bind) = $self->_select(@_);
2021   my @row = $sth->fetchrow_array;
2022   my @nextrow = $sth->fetchrow_array if @row;
2023   if(@row && @nextrow) {
2024     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2025   }
2026   # Need to call finish() to work round broken DBDs
2027   $sth->finish();
2028   return @row;
2029 }
2030
2031 =head2 sth
2032
2033 =over 4
2034
2035 =item Arguments: $sql
2036
2037 =back
2038
2039 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2040
2041 =cut
2042
2043 sub _dbh_sth {
2044   my ($self, $dbh, $sql) = @_;
2045
2046   # 3 is the if_active parameter which avoids active sth re-use
2047   my $sth = $self->disable_sth_caching
2048     ? $dbh->prepare($sql)
2049     : $dbh->prepare_cached($sql, {}, 3);
2050
2051   # XXX You would think RaiseError would make this impossible,
2052   #  but apparently that's not true :(
2053   $self->throw_exception($dbh->errstr) if !$sth;
2054
2055   $sth;
2056 }
2057
2058 sub sth {
2059   my ($self, $sql) = @_;
2060   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2061 }
2062
2063 sub _dbh_columns_info_for {
2064   my ($self, $dbh, $table) = @_;
2065
2066   if ($dbh->can('column_info')) {
2067     my %result;
2068     my $caught;
2069     try {
2070       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2071       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2072       $sth->execute();
2073       while ( my $info = $sth->fetchrow_hashref() ){
2074         my %column_info;
2075         $column_info{data_type}   = $info->{TYPE_NAME};
2076         $column_info{size}      = $info->{COLUMN_SIZE};
2077         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2078         $column_info{default_value} = $info->{COLUMN_DEF};
2079         my $col_name = $info->{COLUMN_NAME};
2080         $col_name =~ s/^\"(.*)\"$/$1/;
2081
2082         $result{$col_name} = \%column_info;
2083       }
2084     } catch {
2085       $caught = 1;
2086     };
2087     return \%result if !$caught && scalar keys %result;
2088   }
2089
2090   my %result;
2091   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2092   $sth->execute;
2093   my @columns = @{$sth->{NAME_lc}};
2094   for my $i ( 0 .. $#columns ){
2095     my %column_info;
2096     $column_info{data_type} = $sth->{TYPE}->[$i];
2097     $column_info{size} = $sth->{PRECISION}->[$i];
2098     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2099
2100     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2101       $column_info{data_type} = $1;
2102       $column_info{size}    = $2;
2103     }
2104
2105     $result{$columns[$i]} = \%column_info;
2106   }
2107   $sth->finish;
2108
2109   foreach my $col (keys %result) {
2110     my $colinfo = $result{$col};
2111     my $type_num = $colinfo->{data_type};
2112     my $type_name;
2113     if(defined $type_num && $dbh->can('type_info')) {
2114       my $type_info = $dbh->type_info($type_num);
2115       $type_name = $type_info->{TYPE_NAME} if $type_info;
2116       $colinfo->{data_type} = $type_name if $type_name;
2117     }
2118   }
2119
2120   return \%result;
2121 }
2122
2123 sub columns_info_for {
2124   my ($self, $table) = @_;
2125   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2126 }
2127
2128 =head2 last_insert_id
2129
2130 Return the row id of the last insert.
2131
2132 =cut
2133
2134 sub _dbh_last_insert_id {
2135     my ($self, $dbh, $source, $col) = @_;
2136
2137     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2138
2139     return $id if defined $id;
2140
2141     my $class = ref $self;
2142     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2143 }
2144
2145 sub last_insert_id {
2146   my $self = shift;
2147   $self->_dbh_last_insert_id ($self->_dbh, @_);
2148 }
2149
2150 =head2 _native_data_type
2151
2152 =over 4
2153
2154 =item Arguments: $type_name
2155
2156 =back
2157
2158 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2159 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2160 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2161
2162 The default implementation returns C<undef>, implement in your Storage driver if
2163 you need this functionality.
2164
2165 Should map types from other databases to the native RDBMS type, for example
2166 C<VARCHAR2> to C<VARCHAR>.
2167
2168 Types with modifiers should map to the underlying data type. For example,
2169 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2170
2171 Composite types should map to the container type, for example
2172 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2173
2174 =cut
2175
2176 sub _native_data_type {
2177   #my ($self, $data_type) = @_;
2178   return undef
2179 }
2180
2181 # Check if placeholders are supported at all
2182 sub _placeholders_supported {
2183   my $self = shift;
2184   my $dbh  = $self->_get_dbh;
2185
2186   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2187   # but it is inaccurate more often than not
2188   return try {
2189     local $dbh->{PrintError} = 0;
2190     local $dbh->{RaiseError} = 1;
2191     $dbh->do('select ?', {}, 1);
2192     1;
2193   }
2194   catch {
2195     0;
2196   };
2197 }
2198
2199 # Check if placeholders bound to non-string types throw exceptions
2200 #
2201 sub _typeless_placeholders_supported {
2202   my $self = shift;
2203   my $dbh  = $self->_get_dbh;
2204
2205   return try {
2206     local $dbh->{PrintError} = 0;
2207     local $dbh->{RaiseError} = 1;
2208     # this specifically tests a bind that is NOT a string
2209     $dbh->do('select 1 where 1 = ?', {}, 1);
2210     1;
2211   }
2212   catch {
2213     0;
2214   };
2215 }
2216
2217 =head2 sqlt_type
2218
2219 Returns the database driver name.
2220
2221 =cut
2222
2223 sub sqlt_type {
2224   shift->_get_dbh->{Driver}->{Name};
2225 }
2226
2227 =head2 bind_attribute_by_data_type
2228
2229 Given a datatype from column info, returns a database specific bind
2230 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2231 let the database planner just handle it.
2232
2233 Generally only needed for special case column types, like bytea in postgres.
2234
2235 =cut
2236
2237 sub bind_attribute_by_data_type {
2238     return;
2239 }
2240
2241 =head2 is_datatype_numeric
2242
2243 Given a datatype from column_info, returns a boolean value indicating if
2244 the current RDBMS considers it a numeric value. This controls how
2245 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2246 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2247 be performed instead of the usual C<eq>.
2248
2249 =cut
2250
2251 sub is_datatype_numeric {
2252   my ($self, $dt) = @_;
2253
2254   return 0 unless $dt;
2255
2256   return $dt =~ /^ (?:
2257     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2258   ) $/ix;
2259 }
2260
2261
2262 =head2 create_ddl_dir
2263
2264 =over 4
2265
2266 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2267
2268 =back
2269
2270 Creates a SQL file based on the Schema, for each of the specified
2271 database engines in C<\@databases> in the given directory.
2272 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2273
2274 Given a previous version number, this will also create a file containing
2275 the ALTER TABLE statements to transform the previous schema into the
2276 current one. Note that these statements may contain C<DROP TABLE> or
2277 C<DROP COLUMN> statements that can potentially destroy data.
2278
2279 The file names are created using the C<ddl_filename> method below, please
2280 override this method in your schema if you would like a different file
2281 name format. For the ALTER file, the same format is used, replacing
2282 $version in the name with "$preversion-$version".
2283
2284 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2285 The most common value for this would be C<< { add_drop_table => 1 } >>
2286 to have the SQL produced include a C<DROP TABLE> statement for each table
2287 created. For quoting purposes supply C<quote_table_names> and
2288 C<quote_field_names>.
2289
2290 If no arguments are passed, then the following default values are assumed:
2291
2292 =over 4
2293
2294 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2295
2296 =item version    - $schema->schema_version
2297
2298 =item directory  - './'
2299
2300 =item preversion - <none>
2301
2302 =back
2303
2304 By default, C<\%sqlt_args> will have
2305
2306  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2307
2308 merged with the hash passed in. To disable any of those features, pass in a
2309 hashref like the following
2310
2311  { ignore_constraint_names => 0, # ... other options }
2312
2313
2314 WARNING: You are strongly advised to check all SQL files created, before applying
2315 them.
2316
2317 =cut
2318
2319 sub create_ddl_dir {
2320   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2321
2322   unless ($dir) {
2323     carp "No directory given, using ./\n";
2324     $dir = './';
2325   } else {
2326       -d $dir or File::Path::mkpath($dir)
2327           or $self->throw_exception("create_ddl_dir: $! creating dir '$dir'");
2328   }
2329
2330   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2331
2332   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2333   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2334
2335   my $schema_version = $schema->schema_version || '1.x';
2336   $version ||= $schema_version;
2337
2338   $sqltargs = {
2339     add_drop_table => 1,
2340     ignore_constraint_names => 1,
2341     ignore_index_names => 1,
2342     %{$sqltargs || {}}
2343   };
2344
2345   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2346     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2347   }
2348
2349   my $sqlt = SQL::Translator->new( $sqltargs );
2350
2351   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2352   my $sqlt_schema = $sqlt->translate({ data => $schema })
2353     or $self->throw_exception ($sqlt->error);
2354
2355   foreach my $db (@$databases) {
2356     $sqlt->reset();
2357     $sqlt->{schema} = $sqlt_schema;
2358     $sqlt->producer($db);
2359
2360     my $file;
2361     my $filename = $schema->ddl_filename($db, $version, $dir);
2362     if (-e $filename && ($version eq $schema_version )) {
2363       # if we are dumping the current version, overwrite the DDL
2364       carp "Overwriting existing DDL file - $filename";
2365       unlink($filename);
2366     }
2367
2368     my $output = $sqlt->translate;
2369     if(!$output) {
2370       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2371       next;
2372     }
2373     if(!open($file, ">$filename")) {
2374       $self->throw_exception("Can't open $filename for writing ($!)");
2375       next;
2376     }
2377     print $file $output;
2378     close($file);
2379
2380     next unless ($preversion);
2381
2382     require SQL::Translator::Diff;
2383
2384     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2385     if(!-e $prefilename) {
2386       carp("No previous schema file found ($prefilename)");
2387       next;
2388     }
2389
2390     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2391     if(-e $difffile) {
2392       carp("Overwriting existing diff file - $difffile");
2393       unlink($difffile);
2394     }
2395
2396     my $source_schema;
2397     {
2398       my $t = SQL::Translator->new($sqltargs);
2399       $t->debug( 0 );
2400       $t->trace( 0 );
2401
2402       $t->parser( $db )
2403         or $self->throw_exception ($t->error);
2404
2405       my $out = $t->translate( $prefilename )
2406         or $self->throw_exception ($t->error);
2407
2408       $source_schema = $t->schema;
2409
2410       $source_schema->name( $prefilename )
2411         unless ( $source_schema->name );
2412     }
2413
2414     # The "new" style of producers have sane normalization and can support
2415     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2416     # And we have to diff parsed SQL against parsed SQL.
2417     my $dest_schema = $sqlt_schema;
2418
2419     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2420       my $t = SQL::Translator->new($sqltargs);
2421       $t->debug( 0 );
2422       $t->trace( 0 );
2423
2424       $t->parser( $db )
2425         or $self->throw_exception ($t->error);
2426
2427       my $out = $t->translate( $filename )
2428         or $self->throw_exception ($t->error);
2429
2430       $dest_schema = $t->schema;
2431
2432       $dest_schema->name( $filename )
2433         unless $dest_schema->name;
2434     }
2435
2436     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2437                                                   $dest_schema,   $db,
2438                                                   $sqltargs
2439                                                  );
2440     if(!open $file, ">$difffile") {
2441       $self->throw_exception("Can't write to $difffile ($!)");
2442       next;
2443     }
2444     print $file $diff;
2445     close($file);
2446   }
2447 }
2448
2449 =head2 deployment_statements
2450
2451 =over 4
2452
2453 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2454
2455 =back
2456
2457 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2458
2459 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2460 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2461
2462 C<$directory> is used to return statements from files in a previously created
2463 L</create_ddl_dir> directory and is optional. The filenames are constructed
2464 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2465
2466 If no C<$directory> is specified then the statements are constructed on the
2467 fly using L<SQL::Translator> and C<$version> is ignored.
2468
2469 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2470
2471 =cut
2472
2473 sub deployment_statements {
2474   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2475   $type ||= $self->sqlt_type;
2476   $version ||= $schema->schema_version || '1.x';
2477   $dir ||= './';
2478   my $filename = $schema->ddl_filename($type, $version, $dir);
2479   if(-f $filename)
2480   {
2481       my $file;
2482       open($file, "<$filename")
2483         or $self->throw_exception("Can't open $filename ($!)");
2484       my @rows = <$file>;
2485       close($file);
2486       return join('', @rows);
2487   }
2488
2489   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2490     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2491   }
2492
2493   # sources needs to be a parser arg, but for simplicty allow at top level
2494   # coming in
2495   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2496       if exists $sqltargs->{sources};
2497
2498   my $tr = SQL::Translator->new(
2499     producer => "SQL::Translator::Producer::${type}",
2500     %$sqltargs,
2501     parser => 'SQL::Translator::Parser::DBIx::Class',
2502     data => $schema,
2503   );
2504
2505   my @ret;
2506   my $wa = wantarray;
2507   if ($wa) {
2508     @ret = $tr->translate;
2509   }
2510   else {
2511     $ret[0] = $tr->translate;
2512   }
2513
2514   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2515     unless (@ret && defined $ret[0]);
2516
2517   return $wa ? @ret : $ret[0];
2518 }
2519
2520 sub deploy {
2521   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2522   my $deploy = sub {
2523     my $line = shift;
2524     return if($line =~ /^--/);
2525     return if(!$line);
2526     # next if($line =~ /^DROP/m);
2527     return if($line =~ /^BEGIN TRANSACTION/m);
2528     return if($line =~ /^COMMIT/m);
2529     return if $line =~ /^\s+$/; # skip whitespace only
2530     $self->_query_start($line);
2531     try {
2532       # do a dbh_do cycle here, as we need some error checking in
2533       # place (even though we will ignore errors)
2534       $self->dbh_do (sub { $_[1]->do($line) });
2535     } catch {
2536       carp qq{$_ (running "${line}")};
2537     };
2538     $self->_query_end($line);
2539   };
2540   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2541   if (@statements > 1) {
2542     foreach my $statement (@statements) {
2543       $deploy->( $statement );
2544     }
2545   }
2546   elsif (@statements == 1) {
2547     foreach my $line ( split(";\n", $statements[0])) {
2548       $deploy->( $line );
2549     }
2550   }
2551 }
2552
2553 =head2 datetime_parser
2554
2555 Returns the datetime parser class
2556
2557 =cut
2558
2559 sub datetime_parser {
2560   my $self = shift;
2561   return $self->{datetime_parser} ||= do {
2562     $self->build_datetime_parser(@_);
2563   };
2564 }
2565
2566 =head2 datetime_parser_type
2567
2568 Defines (returns) the datetime parser class - currently hardwired to
2569 L<DateTime::Format::MySQL>
2570
2571 =cut
2572
2573 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2574
2575 =head2 build_datetime_parser
2576
2577 See L</datetime_parser>
2578
2579 =cut
2580
2581 sub build_datetime_parser {
2582   my $self = shift;
2583   my $type = $self->datetime_parser_type(@_);
2584   $self->ensure_class_loaded ($type);
2585   return $type;
2586 }
2587
2588
2589 =head2 is_replicating
2590
2591 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2592 replicate from a master database.  Default is undef, which is the result
2593 returned by databases that don't support replication.
2594
2595 =cut
2596
2597 sub is_replicating {
2598     return;
2599
2600 }
2601
2602 =head2 lag_behind_master
2603
2604 Returns a number that represents a certain amount of lag behind a master db
2605 when a given storage is replicating.  The number is database dependent, but
2606 starts at zero and increases with the amount of lag. Default in undef
2607
2608 =cut
2609
2610 sub lag_behind_master {
2611     return;
2612 }
2613
2614 =head2 relname_to_table_alias
2615
2616 =over 4
2617
2618 =item Arguments: $relname, $join_count
2619
2620 =back
2621
2622 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2623 queries.
2624
2625 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2626 way these aliases are named.
2627
2628 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2629 otherwise C<"$relname">.
2630
2631 =cut
2632
2633 sub relname_to_table_alias {
2634   my ($self, $relname, $join_count) = @_;
2635
2636   my $alias = ($join_count && $join_count > 1 ?
2637     join('_', $relname, $join_count) : $relname);
2638
2639   return $alias;
2640 }
2641
2642 1;
2643
2644 =head1 USAGE NOTES
2645
2646 =head2 DBIx::Class and AutoCommit
2647
2648 DBIx::Class can do some wonderful magic with handling exceptions,
2649 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2650 (the default) combined with C<txn_do> for transaction support.
2651
2652 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2653 in an assumed transaction between commits, and you're telling us you'd
2654 like to manage that manually.  A lot of the magic protections offered by
2655 this module will go away.  We can't protect you from exceptions due to database
2656 disconnects because we don't know anything about how to restart your
2657 transactions.  You're on your own for handling all sorts of exceptional
2658 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2659 be with raw DBI.
2660
2661
2662 =head1 AUTHORS
2663
2664 Matt S. Trout <mst@shadowcatsystems.co.uk>
2665
2666 Andy Grundman <andy@hybridized.org>
2667
2668 =head1 LICENSE
2669
2670 You may distribute this code under the same terms as Perl itself.
2671
2672 =cut