Really fix INSERT RETURNING - simply make it a flag on the storage and keep the machi...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18
19 __PACKAGE__->mk_group_accessors('simple' =>
20   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
21      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
22 );
23
24 # the values for these accessors are picked out (and deleted) from
25 # the attribute hashref passed to connect_info
26 my @storage_options = qw/
27   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
28   disable_sth_caching unsafe auto_savepoint
29 /;
30 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
31
32
33 # default cursor class, overridable in connect_info attributes
34 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
35
36 __PACKAGE__->mk_group_accessors('inherited' => qw/
37   sql_maker_class
38   can_insert_returning
39 /);
40 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
41
42
43 # Each of these methods need _determine_driver called before itself
44 # in order to function reliably. This is a purely DRY optimization
45 my @rdbms_specific_methods = qw/
46   deployment_statements
47   sqlt_type
48   build_datetime_parser
49   datetime_parser_type
50
51   insert
52   insert_bulk
53   update
54   delete
55   select
56   select_single
57 /;
58
59 for my $meth (@rdbms_specific_methods) {
60
61   my $orig = __PACKAGE__->can ($meth)
62     or next;
63
64   no strict qw/refs/;
65   no warnings qw/redefine/;
66   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
67     if (not $_[0]->_driver_determined) {
68       $_[0]->_determine_driver;
69       goto $_[0]->can($meth);
70     }
71     $orig->(@_);
72   };
73 }
74
75
76 =head1 NAME
77
78 DBIx::Class::Storage::DBI - DBI storage handler
79
80 =head1 SYNOPSIS
81
82   my $schema = MySchema->connect('dbi:SQLite:my.db');
83
84   $schema->storage->debug(1);
85
86   my @stuff = $schema->storage->dbh_do(
87     sub {
88       my ($storage, $dbh, @args) = @_;
89       $dbh->do("DROP TABLE authors");
90     },
91     @column_list
92   );
93
94   $schema->resultset('Book')->search({
95      written_on => $schema->storage->datetime_parser(DateTime->now)
96   });
97
98 =head1 DESCRIPTION
99
100 This class represents the connection to an RDBMS via L<DBI>.  See
101 L<DBIx::Class::Storage> for general information.  This pod only
102 documents DBI-specific methods and behaviors.
103
104 =head1 METHODS
105
106 =cut
107
108 sub new {
109   my $new = shift->next::method(@_);
110
111   $new->transaction_depth(0);
112   $new->_sql_maker_opts({});
113   $new->{savepoints} = [];
114   $new->{_in_dbh_do} = 0;
115   $new->{_dbh_gen} = 0;
116
117   $new;
118 }
119
120 =head2 connect_info
121
122 This method is normally called by L<DBIx::Class::Schema/connection>, which
123 encapsulates its argument list in an arrayref before passing them here.
124
125 The argument list may contain:
126
127 =over
128
129 =item *
130
131 The same 4-element argument set one would normally pass to
132 L<DBI/connect>, optionally followed by
133 L<extra attributes|/DBIx::Class specific connection attributes>
134 recognized by DBIx::Class:
135
136   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
137
138 =item *
139
140 A single code reference which returns a connected
141 L<DBI database handle|DBI/connect> optionally followed by
142 L<extra attributes|/DBIx::Class specific connection attributes> recognized
143 by DBIx::Class:
144
145   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
146
147 =item *
148
149 A single hashref with all the attributes and the dsn/user/password
150 mixed together:
151
152   $connect_info_args = [{
153     dsn => $dsn,
154     user => $user,
155     password => $pass,
156     %dbi_attributes,
157     %extra_attributes,
158   }];
159
160   $connect_info_args = [{
161     dbh_maker => sub { DBI->connect (...) },
162     %dbi_attributes,
163     %extra_attributes,
164   }];
165
166 This is particularly useful for L<Catalyst> based applications, allowing the
167 following config (L<Config::General> style):
168
169   <Model::DB>
170     schema_class   App::DB
171     <connect_info>
172       dsn          dbi:mysql:database=test
173       user         testuser
174       password     TestPass
175       AutoCommit   1
176     </connect_info>
177   </Model::DB>
178
179 The C<dsn>/C<user>/C<password> combination can be substituted by the
180 C<dbh_maker> key whose value is a coderef that returns a connected
181 L<DBI database handle|DBI/connect>
182
183 =back
184
185 Please note that the L<DBI> docs recommend that you always explicitly
186 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
187 recommends that it be set to I<1>, and that you perform transactions
188 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
189 to I<1> if you do not do explicitly set it to zero.  This is the default
190 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
191
192 =head3 DBIx::Class specific connection attributes
193
194 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
195 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
196 the following connection options. These options can be mixed in with your other
197 L<DBI> connection attributes, or placed in a separate hashref
198 (C<\%extra_attributes>) as shown above.
199
200 Every time C<connect_info> is invoked, any previous settings for
201 these options will be cleared before setting the new ones, regardless of
202 whether any options are specified in the new C<connect_info>.
203
204
205 =over
206
207 =item on_connect_do
208
209 Specifies things to do immediately after connecting or re-connecting to
210 the database.  Its value may contain:
211
212 =over
213
214 =item a scalar
215
216 This contains one SQL statement to execute.
217
218 =item an array reference
219
220 This contains SQL statements to execute in order.  Each element contains
221 a string or a code reference that returns a string.
222
223 =item a code reference
224
225 This contains some code to execute.  Unlike code references within an
226 array reference, its return value is ignored.
227
228 =back
229
230 =item on_disconnect_do
231
232 Takes arguments in the same form as L</on_connect_do> and executes them
233 immediately before disconnecting from the database.
234
235 Note, this only runs if you explicitly call L</disconnect> on the
236 storage object.
237
238 =item on_connect_call
239
240 A more generalized form of L</on_connect_do> that calls the specified
241 C<connect_call_METHOD> methods in your storage driver.
242
243   on_connect_do => 'select 1'
244
245 is equivalent to:
246
247   on_connect_call => [ [ do_sql => 'select 1' ] ]
248
249 Its values may contain:
250
251 =over
252
253 =item a scalar
254
255 Will call the C<connect_call_METHOD> method.
256
257 =item a code reference
258
259 Will execute C<< $code->($storage) >>
260
261 =item an array reference
262
263 Each value can be a method name or code reference.
264
265 =item an array of arrays
266
267 For each array, the first item is taken to be the C<connect_call_> method name
268 or code reference, and the rest are parameters to it.
269
270 =back
271
272 Some predefined storage methods you may use:
273
274 =over
275
276 =item do_sql
277
278 Executes a SQL string or a code reference that returns a SQL string. This is
279 what L</on_connect_do> and L</on_disconnect_do> use.
280
281 It can take:
282
283 =over
284
285 =item a scalar
286
287 Will execute the scalar as SQL.
288
289 =item an arrayref
290
291 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
292 attributes hashref and bind values.
293
294 =item a code reference
295
296 Will execute C<< $code->($storage) >> and execute the return array refs as
297 above.
298
299 =back
300
301 =item datetime_setup
302
303 Execute any statements necessary to initialize the database session to return
304 and accept datetime/timestamp values used with
305 L<DBIx::Class::InflateColumn::DateTime>.
306
307 Only necessary for some databases, see your specific storage driver for
308 implementation details.
309
310 =back
311
312 =item on_disconnect_call
313
314 Takes arguments in the same form as L</on_connect_call> and executes them
315 immediately before disconnecting from the database.
316
317 Calls the C<disconnect_call_METHOD> methods as opposed to the
318 C<connect_call_METHOD> methods called by L</on_connect_call>.
319
320 Note, this only runs if you explicitly call L</disconnect> on the
321 storage object.
322
323 =item disable_sth_caching
324
325 If set to a true value, this option will disable the caching of
326 statement handles via L<DBI/prepare_cached>.
327
328 =item limit_dialect
329
330 Sets the limit dialect. This is useful for JDBC-bridge among others
331 where the remote SQL-dialect cannot be determined by the name of the
332 driver alone. See also L<SQL::Abstract::Limit>.
333
334 =item quote_char
335
336 Specifies what characters to use to quote table and column names. If
337 you use this you will want to specify L</name_sep> as well.
338
339 C<quote_char> expects either a single character, in which case is it
340 is placed on either side of the table/column name, or an arrayref of length
341 2 in which case the table/column name is placed between the elements.
342
343 For example under MySQL you should use C<< quote_char => '`' >>, and for
344 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
345
346 =item name_sep
347
348 This only needs to be used in conjunction with C<quote_char>, and is used to
349 specify the character that separates elements (schemas, tables, columns) from
350 each other. In most cases this is simply a C<.>.
351
352 The consequences of not supplying this value is that L<SQL::Abstract>
353 will assume DBIx::Class' uses of aliases to be complete column
354 names. The output will look like I<"me.name"> when it should actually
355 be I<"me"."name">.
356
357 =item unsafe
358
359 This Storage driver normally installs its own C<HandleError>, sets
360 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
361 all database handles, including those supplied by a coderef.  It does this
362 so that it can have consistent and useful error behavior.
363
364 If you set this option to a true value, Storage will not do its usual
365 modifications to the database handle's attributes, and instead relies on
366 the settings in your connect_info DBI options (or the values you set in
367 your connection coderef, in the case that you are connecting via coderef).
368
369 Note that your custom settings can cause Storage to malfunction,
370 especially if you set a C<HandleError> handler that suppresses exceptions
371 and/or disable C<RaiseError>.
372
373 =item auto_savepoint
374
375 If this option is true, L<DBIx::Class> will use savepoints when nesting
376 transactions, making it possible to recover from failure in the inner
377 transaction without having to abort all outer transactions.
378
379 =item cursor_class
380
381 Use this argument to supply a cursor class other than the default
382 L<DBIx::Class::Storage::DBI::Cursor>.
383
384 =back
385
386 Some real-life examples of arguments to L</connect_info> and
387 L<DBIx::Class::Schema/connect>
388
389   # Simple SQLite connection
390   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
391
392   # Connect via subref
393   ->connect_info([ sub { DBI->connect(...) } ]);
394
395   # Connect via subref in hashref
396   ->connect_info([{
397     dbh_maker => sub { DBI->connect(...) },
398     on_connect_do => 'alter session ...',
399   }]);
400
401   # A bit more complicated
402   ->connect_info(
403     [
404       'dbi:Pg:dbname=foo',
405       'postgres',
406       'my_pg_password',
407       { AutoCommit => 1 },
408       { quote_char => q{"}, name_sep => q{.} },
409     ]
410   );
411
412   # Equivalent to the previous example
413   ->connect_info(
414     [
415       'dbi:Pg:dbname=foo',
416       'postgres',
417       'my_pg_password',
418       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
419     ]
420   );
421
422   # Same, but with hashref as argument
423   # See parse_connect_info for explanation
424   ->connect_info(
425     [{
426       dsn         => 'dbi:Pg:dbname=foo',
427       user        => 'postgres',
428       password    => 'my_pg_password',
429       AutoCommit  => 1,
430       quote_char  => q{"},
431       name_sep    => q{.},
432     }]
433   );
434
435   # Subref + DBIx::Class-specific connection options
436   ->connect_info(
437     [
438       sub { DBI->connect(...) },
439       {
440           quote_char => q{`},
441           name_sep => q{@},
442           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
443           disable_sth_caching => 1,
444       },
445     ]
446   );
447
448
449
450 =cut
451
452 sub connect_info {
453   my ($self, $info) = @_;
454
455   return $self->_connect_info if !$info;
456
457   $self->_connect_info($info); # copy for _connect_info
458
459   $info = $self->_normalize_connect_info($info)
460     if ref $info eq 'ARRAY';
461
462   for my $storage_opt (keys %{ $info->{storage_options} }) {
463     my $value = $info->{storage_options}{$storage_opt};
464
465     $self->$storage_opt($value);
466   }
467
468   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
469   #  the new set of options
470   $self->_sql_maker(undef);
471   $self->_sql_maker_opts({});
472
473   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
474     my $value = $info->{sql_maker_options}{$sql_maker_opt};
475
476     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
477   }
478
479   my %attrs = (
480     %{ $self->_default_dbi_connect_attributes || {} },
481     %{ $info->{attributes} || {} },
482   );
483
484   my @args = @{ $info->{arguments} };
485
486   $self->_dbi_connect_info([@args,
487     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
488
489   return $self->_connect_info;
490 }
491
492 sub _normalize_connect_info {
493   my ($self, $info_arg) = @_;
494   my %info;
495
496   my @args = @$info_arg;  # take a shallow copy for further mutilation
497
498   # combine/pre-parse arguments depending on invocation style
499
500   my %attrs;
501   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
502     %attrs = %{ $args[1] || {} };
503     @args = $args[0];
504   }
505   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
506     %attrs = %{$args[0]};
507     @args = ();
508     if (my $code = delete $attrs{dbh_maker}) {
509       @args = $code;
510
511       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
512       if (@ignored) {
513         carp sprintf (
514             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
515           . "to the result of 'dbh_maker'",
516
517           join (', ', map { "'$_'" } (@ignored) ),
518         );
519       }
520     }
521     else {
522       @args = delete @attrs{qw/dsn user password/};
523     }
524   }
525   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
526     %attrs = (
527       % { $args[3] || {} },
528       % { $args[4] || {} },
529     );
530     @args = @args[0,1,2];
531   }
532
533   $info{arguments} = \@args;
534
535   my @storage_opts = grep exists $attrs{$_},
536     @storage_options, 'cursor_class';
537
538   @{ $info{storage_options} }{@storage_opts} =
539     delete @attrs{@storage_opts} if @storage_opts;
540
541   my @sql_maker_opts = grep exists $attrs{$_},
542     qw/limit_dialect quote_char name_sep/;
543
544   @{ $info{sql_maker_options} }{@sql_maker_opts} =
545     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
546
547   $info{attributes} = \%attrs if %attrs;
548
549   return \%info;
550 }
551
552 sub _default_dbi_connect_attributes {
553   return {
554     AutoCommit => 1,
555     RaiseError => 1,
556     PrintError => 0,
557   };
558 }
559
560 =head2 on_connect_do
561
562 This method is deprecated in favour of setting via L</connect_info>.
563
564 =cut
565
566 =head2 on_disconnect_do
567
568 This method is deprecated in favour of setting via L</connect_info>.
569
570 =cut
571
572 sub _parse_connect_do {
573   my ($self, $type) = @_;
574
575   my $val = $self->$type;
576   return () if not defined $val;
577
578   my @res;
579
580   if (not ref($val)) {
581     push @res, [ 'do_sql', $val ];
582   } elsif (ref($val) eq 'CODE') {
583     push @res, $val;
584   } elsif (ref($val) eq 'ARRAY') {
585     push @res, map { [ 'do_sql', $_ ] } @$val;
586   } else {
587     $self->throw_exception("Invalid type for $type: ".ref($val));
588   }
589
590   return \@res;
591 }
592
593 =head2 dbh_do
594
595 Arguments: ($subref | $method_name), @extra_coderef_args?
596
597 Execute the given $subref or $method_name using the new exception-based
598 connection management.
599
600 The first two arguments will be the storage object that C<dbh_do> was called
601 on and a database handle to use.  Any additional arguments will be passed
602 verbatim to the called subref as arguments 2 and onwards.
603
604 Using this (instead of $self->_dbh or $self->dbh) ensures correct
605 exception handling and reconnection (or failover in future subclasses).
606
607 Your subref should have no side-effects outside of the database, as
608 there is the potential for your subref to be partially double-executed
609 if the database connection was stale/dysfunctional.
610
611 Example:
612
613   my @stuff = $schema->storage->dbh_do(
614     sub {
615       my ($storage, $dbh, @cols) = @_;
616       my $cols = join(q{, }, @cols);
617       $dbh->selectrow_array("SELECT $cols FROM foo");
618     },
619     @column_list
620   );
621
622 =cut
623
624 sub dbh_do {
625   my $self = shift;
626   my $code = shift;
627
628   my $dbh = $self->_get_dbh;
629
630   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
631       || $self->{transaction_depth};
632
633   local $self->{_in_dbh_do} = 1;
634
635   my @result;
636   my $want_array = wantarray;
637
638   eval {
639
640     if($want_array) {
641         @result = $self->$code($dbh, @_);
642     }
643     elsif(defined $want_array) {
644         $result[0] = $self->$code($dbh, @_);
645     }
646     else {
647         $self->$code($dbh, @_);
648     }
649   };
650
651   # ->connected might unset $@ - copy
652   my $exception = $@;
653   if(!$exception) { return $want_array ? @result : $result[0] }
654
655   $self->throw_exception($exception) if $self->connected;
656
657   # We were not connected - reconnect and retry, but let any
658   #  exception fall right through this time
659   carp "Retrying $code after catching disconnected exception: $exception"
660     if $ENV{DBIC_DBIRETRY_DEBUG};
661   $self->_populate_dbh;
662   $self->$code($self->_dbh, @_);
663 }
664
665 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
666 # It also informs dbh_do to bypass itself while under the direction of txn_do,
667 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
668 sub txn_do {
669   my $self = shift;
670   my $coderef = shift;
671
672   ref $coderef eq 'CODE' or $self->throw_exception
673     ('$coderef must be a CODE reference');
674
675   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
676
677   local $self->{_in_dbh_do} = 1;
678
679   my @result;
680   my $want_array = wantarray;
681
682   my $tried = 0;
683   while(1) {
684     eval {
685       $self->_get_dbh;
686
687       $self->txn_begin;
688       if($want_array) {
689           @result = $coderef->(@_);
690       }
691       elsif(defined $want_array) {
692           $result[0] = $coderef->(@_);
693       }
694       else {
695           $coderef->(@_);
696       }
697       $self->txn_commit;
698     };
699
700     # ->connected might unset $@ - copy
701     my $exception = $@;
702     if(!$exception) { return $want_array ? @result : $result[0] }
703
704     if($tried++ || $self->connected) {
705       eval { $self->txn_rollback };
706       my $rollback_exception = $@;
707       if($rollback_exception) {
708         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
709         $self->throw_exception($exception)  # propagate nested rollback
710           if $rollback_exception =~ /$exception_class/;
711
712         $self->throw_exception(
713           "Transaction aborted: ${exception}. "
714           . "Rollback failed: ${rollback_exception}"
715         );
716       }
717       $self->throw_exception($exception)
718     }
719
720     # We were not connected, and was first try - reconnect and retry
721     # via the while loop
722     carp "Retrying $coderef after catching disconnected exception: $exception"
723       if $ENV{DBIC_DBIRETRY_DEBUG};
724     $self->_populate_dbh;
725   }
726 }
727
728 =head2 disconnect
729
730 Our C<disconnect> method also performs a rollback first if the
731 database is not in C<AutoCommit> mode.
732
733 =cut
734
735 sub disconnect {
736   my ($self) = @_;
737
738   if( $self->_dbh ) {
739     my @actions;
740
741     push @actions, ( $self->on_disconnect_call || () );
742     push @actions, $self->_parse_connect_do ('on_disconnect_do');
743
744     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
745
746     $self->_dbh_rollback unless $self->_dbh_autocommit;
747
748     %{ $self->_dbh->{CachedKids} } = ();
749     $self->_dbh->disconnect;
750     $self->_dbh(undef);
751     $self->{_dbh_gen}++;
752   }
753 }
754
755 =head2 with_deferred_fk_checks
756
757 =over 4
758
759 =item Arguments: C<$coderef>
760
761 =item Return Value: The return value of $coderef
762
763 =back
764
765 Storage specific method to run the code ref with FK checks deferred or
766 in MySQL's case disabled entirely.
767
768 =cut
769
770 # Storage subclasses should override this
771 sub with_deferred_fk_checks {
772   my ($self, $sub) = @_;
773   $sub->();
774 }
775
776 =head2 connected
777
778 =over
779
780 =item Arguments: none
781
782 =item Return Value: 1|0
783
784 =back
785
786 Verifies that the current database handle is active and ready to execute
787 an SQL statement (e.g. the connection did not get stale, server is still
788 answering, etc.) This method is used internally by L</dbh>.
789
790 =cut
791
792 sub connected {
793   my $self = shift;
794   return 0 unless $self->_seems_connected;
795
796   #be on the safe side
797   local $self->_dbh->{RaiseError} = 1;
798
799   return $self->_ping;
800 }
801
802 sub _seems_connected {
803   my $self = shift;
804
805   my $dbh = $self->_dbh
806     or return 0;
807
808   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
809     $self->_dbh(undef);
810     $self->{_dbh_gen}++;
811     return 0;
812   }
813   else {
814     $self->_verify_pid;
815     return 0 if !$self->_dbh;
816   }
817
818   return $dbh->FETCH('Active');
819 }
820
821 sub _ping {
822   my $self = shift;
823
824   my $dbh = $self->_dbh or return 0;
825
826   return $dbh->ping;
827 }
828
829 # handle pid changes correctly
830 #  NOTE: assumes $self->_dbh is a valid $dbh
831 sub _verify_pid {
832   my ($self) = @_;
833
834   return if defined $self->_conn_pid && $self->_conn_pid == $$;
835
836   $self->_dbh->{InactiveDestroy} = 1;
837   $self->_dbh(undef);
838   $self->{_dbh_gen}++;
839
840   return;
841 }
842
843 sub ensure_connected {
844   my ($self) = @_;
845
846   unless ($self->connected) {
847     $self->_populate_dbh;
848   }
849 }
850
851 =head2 dbh
852
853 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
854 is guaranteed to be healthy by implicitly calling L</connected>, and if
855 necessary performing a reconnection before returning. Keep in mind that this
856 is very B<expensive> on some database engines. Consider using L</dbh_do>
857 instead.
858
859 =cut
860
861 sub dbh {
862   my ($self) = @_;
863
864   if (not $self->_dbh) {
865     $self->_populate_dbh;
866   } else {
867     $self->ensure_connected;
868   }
869   return $self->_dbh;
870 }
871
872 # this is the internal "get dbh or connect (don't check)" method
873 sub _get_dbh {
874   my $self = shift;
875   $self->_verify_pid if $self->_dbh;
876   $self->_populate_dbh unless $self->_dbh;
877   return $self->_dbh;
878 }
879
880 sub _sql_maker_args {
881     my ($self) = @_;
882
883     return (
884       bindtype=>'columns',
885       array_datatypes => 1,
886       limit_dialect => $self->_get_dbh,
887       %{$self->_sql_maker_opts}
888     );
889 }
890
891 sub sql_maker {
892   my ($self) = @_;
893   unless ($self->_sql_maker) {
894     my $sql_maker_class = $self->sql_maker_class;
895     $self->ensure_class_loaded ($sql_maker_class);
896     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
897   }
898   return $self->_sql_maker;
899 }
900
901 # nothing to do by default
902 sub _rebless {}
903 sub _init {}
904
905 sub _populate_dbh {
906   my ($self) = @_;
907
908   my @info = @{$self->_dbi_connect_info || []};
909   $self->_dbh(undef); # in case ->connected failed we might get sent here
910   $self->_dbh($self->_connect(@info));
911
912   $self->_conn_pid($$);
913   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
914
915   $self->_determine_driver;
916
917   # Always set the transaction depth on connect, since
918   #  there is no transaction in progress by definition
919   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
920
921   $self->_run_connection_actions unless $self->{_in_determine_driver};
922 }
923
924 sub _run_connection_actions {
925   my $self = shift;
926   my @actions;
927
928   push @actions, ( $self->on_connect_call || () );
929   push @actions, $self->_parse_connect_do ('on_connect_do');
930
931   $self->_do_connection_actions(connect_call_ => $_) for @actions;
932 }
933
934 sub _determine_driver {
935   my ($self) = @_;
936
937   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
938     my $started_connected = 0;
939     local $self->{_in_determine_driver} = 1;
940
941     if (ref($self) eq __PACKAGE__) {
942       my $driver;
943       if ($self->_dbh) { # we are connected
944         $driver = $self->_dbh->{Driver}{Name};
945         $started_connected = 1;
946       } else {
947         # if connect_info is a CODEREF, we have no choice but to connect
948         if (ref $self->_dbi_connect_info->[0] &&
949             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
950           $self->_populate_dbh;
951           $driver = $self->_dbh->{Driver}{Name};
952         }
953         else {
954           # try to use dsn to not require being connected, the driver may still
955           # force a connection in _rebless to determine version
956           # (dsn may not be supplied at all if all we do is make a mock-schema)
957           my $dsn = $self->_dbi_connect_info->[0] || '';
958           ($driver) = $dsn =~ /dbi:([^:]+):/i;
959         }
960       }
961
962       if ($driver) {
963         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
964         if ($self->load_optional_class($storage_class)) {
965           mro::set_mro($storage_class, 'c3');
966           bless $self, $storage_class;
967           $self->_rebless();
968         }
969       }
970     }
971
972     $self->_driver_determined(1);
973
974     $self->_init; # run driver-specific initializations
975
976     $self->_run_connection_actions
977         if !$started_connected && defined $self->_dbh;
978   }
979 }
980
981 sub _do_connection_actions {
982   my $self          = shift;
983   my $method_prefix = shift;
984   my $call          = shift;
985
986   if (not ref($call)) {
987     my $method = $method_prefix . $call;
988     $self->$method(@_);
989   } elsif (ref($call) eq 'CODE') {
990     $self->$call(@_);
991   } elsif (ref($call) eq 'ARRAY') {
992     if (ref($call->[0]) ne 'ARRAY') {
993       $self->_do_connection_actions($method_prefix, $_) for @$call;
994     } else {
995       $self->_do_connection_actions($method_prefix, @$_) for @$call;
996     }
997   } else {
998     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
999   }
1000
1001   return $self;
1002 }
1003
1004 sub connect_call_do_sql {
1005   my $self = shift;
1006   $self->_do_query(@_);
1007 }
1008
1009 sub disconnect_call_do_sql {
1010   my $self = shift;
1011   $self->_do_query(@_);
1012 }
1013
1014 # override in db-specific backend when necessary
1015 sub connect_call_datetime_setup { 1 }
1016
1017 sub _do_query {
1018   my ($self, $action) = @_;
1019
1020   if (ref $action eq 'CODE') {
1021     $action = $action->($self);
1022     $self->_do_query($_) foreach @$action;
1023   }
1024   else {
1025     # Most debuggers expect ($sql, @bind), so we need to exclude
1026     # the attribute hash which is the second argument to $dbh->do
1027     # furthermore the bind values are usually to be presented
1028     # as named arrayref pairs, so wrap those here too
1029     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1030     my $sql = shift @do_args;
1031     my $attrs = shift @do_args;
1032     my @bind = map { [ undef, $_ ] } @do_args;
1033
1034     $self->_query_start($sql, @bind);
1035     $self->_get_dbh->do($sql, $attrs, @do_args);
1036     $self->_query_end($sql, @bind);
1037   }
1038
1039   return $self;
1040 }
1041
1042 sub _connect {
1043   my ($self, @info) = @_;
1044
1045   $self->throw_exception("You failed to provide any connection info")
1046     if !@info;
1047
1048   my ($old_connect_via, $dbh);
1049
1050   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1051     $old_connect_via = $DBI::connect_via;
1052     $DBI::connect_via = 'connect';
1053   }
1054
1055   eval {
1056     if(ref $info[0] eq 'CODE') {
1057        $dbh = $info[0]->();
1058     }
1059     else {
1060        $dbh = DBI->connect(@info);
1061     }
1062
1063     if($dbh && !$self->unsafe) {
1064       my $weak_self = $self;
1065       Scalar::Util::weaken($weak_self);
1066       $dbh->{HandleError} = sub {
1067           if ($weak_self) {
1068             $weak_self->throw_exception("DBI Exception: $_[0]");
1069           }
1070           else {
1071             # the handler may be invoked by something totally out of
1072             # the scope of DBIC
1073             croak ("DBI Exception: $_[0]");
1074           }
1075       };
1076       $dbh->{ShowErrorStatement} = 1;
1077       $dbh->{RaiseError} = 1;
1078       $dbh->{PrintError} = 0;
1079     }
1080   };
1081
1082   $DBI::connect_via = $old_connect_via if $old_connect_via;
1083
1084   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1085     if !$dbh || $@;
1086
1087   $self->_dbh_autocommit($dbh->{AutoCommit});
1088
1089   $dbh;
1090 }
1091
1092 sub svp_begin {
1093   my ($self, $name) = @_;
1094
1095   $name = $self->_svp_generate_name
1096     unless defined $name;
1097
1098   $self->throw_exception ("You can't use savepoints outside a transaction")
1099     if $self->{transaction_depth} == 0;
1100
1101   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1102     unless $self->can('_svp_begin');
1103
1104   push @{ $self->{savepoints} }, $name;
1105
1106   $self->debugobj->svp_begin($name) if $self->debug;
1107
1108   return $self->_svp_begin($name);
1109 }
1110
1111 sub svp_release {
1112   my ($self, $name) = @_;
1113
1114   $self->throw_exception ("You can't use savepoints outside a transaction")
1115     if $self->{transaction_depth} == 0;
1116
1117   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1118     unless $self->can('_svp_release');
1119
1120   if (defined $name) {
1121     $self->throw_exception ("Savepoint '$name' does not exist")
1122       unless grep { $_ eq $name } @{ $self->{savepoints} };
1123
1124     # Dig through the stack until we find the one we are releasing.  This keeps
1125     # the stack up to date.
1126     my $svp;
1127
1128     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1129   } else {
1130     $name = pop @{ $self->{savepoints} };
1131   }
1132
1133   $self->debugobj->svp_release($name) if $self->debug;
1134
1135   return $self->_svp_release($name);
1136 }
1137
1138 sub svp_rollback {
1139   my ($self, $name) = @_;
1140
1141   $self->throw_exception ("You can't use savepoints outside a transaction")
1142     if $self->{transaction_depth} == 0;
1143
1144   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1145     unless $self->can('_svp_rollback');
1146
1147   if (defined $name) {
1148       # If they passed us a name, verify that it exists in the stack
1149       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1150           $self->throw_exception("Savepoint '$name' does not exist!");
1151       }
1152
1153       # Dig through the stack until we find the one we are releasing.  This keeps
1154       # the stack up to date.
1155       while(my $s = pop(@{ $self->{savepoints} })) {
1156           last if($s eq $name);
1157       }
1158       # Add the savepoint back to the stack, as a rollback doesn't remove the
1159       # named savepoint, only everything after it.
1160       push(@{ $self->{savepoints} }, $name);
1161   } else {
1162       # We'll assume they want to rollback to the last savepoint
1163       $name = $self->{savepoints}->[-1];
1164   }
1165
1166   $self->debugobj->svp_rollback($name) if $self->debug;
1167
1168   return $self->_svp_rollback($name);
1169 }
1170
1171 sub _svp_generate_name {
1172     my ($self) = @_;
1173
1174     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1175 }
1176
1177 sub txn_begin {
1178   my $self = shift;
1179
1180   # this means we have not yet connected and do not know the AC status
1181   # (e.g. coderef $dbh)
1182   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1183
1184   if($self->{transaction_depth} == 0) {
1185     $self->debugobj->txn_begin()
1186       if $self->debug;
1187     $self->_dbh_begin_work;
1188   }
1189   elsif ($self->auto_savepoint) {
1190     $self->svp_begin;
1191   }
1192   $self->{transaction_depth}++;
1193 }
1194
1195 sub _dbh_begin_work {
1196   my $self = shift;
1197
1198   # if the user is utilizing txn_do - good for him, otherwise we need to
1199   # ensure that the $dbh is healthy on BEGIN.
1200   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1201   # will be replaced by a failure of begin_work itself (which will be
1202   # then retried on reconnect)
1203   if ($self->{_in_dbh_do}) {
1204     $self->_dbh->begin_work;
1205   } else {
1206     $self->dbh_do(sub { $_[1]->begin_work });
1207   }
1208 }
1209
1210 sub txn_commit {
1211   my $self = shift;
1212   if ($self->{transaction_depth} == 1) {
1213     $self->debugobj->txn_commit()
1214       if ($self->debug);
1215     $self->_dbh_commit;
1216     $self->{transaction_depth} = 0
1217       if $self->_dbh_autocommit;
1218   }
1219   elsif($self->{transaction_depth} > 1) {
1220     $self->{transaction_depth}--;
1221     $self->svp_release
1222       if $self->auto_savepoint;
1223   }
1224 }
1225
1226 sub _dbh_commit {
1227   my $self = shift;
1228   my $dbh  = $self->_dbh
1229     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1230   $dbh->commit;
1231 }
1232
1233 sub txn_rollback {
1234   my $self = shift;
1235   my $dbh = $self->_dbh;
1236   eval {
1237     if ($self->{transaction_depth} == 1) {
1238       $self->debugobj->txn_rollback()
1239         if ($self->debug);
1240       $self->{transaction_depth} = 0
1241         if $self->_dbh_autocommit;
1242       $self->_dbh_rollback;
1243     }
1244     elsif($self->{transaction_depth} > 1) {
1245       $self->{transaction_depth}--;
1246       if ($self->auto_savepoint) {
1247         $self->svp_rollback;
1248         $self->svp_release;
1249       }
1250     }
1251     else {
1252       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1253     }
1254   };
1255   if ($@) {
1256     my $error = $@;
1257     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1258     $error =~ /$exception_class/ and $self->throw_exception($error);
1259     # ensure that a failed rollback resets the transaction depth
1260     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1261     $self->throw_exception($error);
1262   }
1263 }
1264
1265 sub _dbh_rollback {
1266   my $self = shift;
1267   my $dbh  = $self->_dbh
1268     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1269   $dbh->rollback;
1270 }
1271
1272 # This used to be the top-half of _execute.  It was split out to make it
1273 #  easier to override in NoBindVars without duping the rest.  It takes up
1274 #  all of _execute's args, and emits $sql, @bind.
1275 sub _prep_for_execute {
1276   my ($self, $op, $extra_bind, $ident, $args) = @_;
1277
1278   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1279     $ident = $ident->from();
1280   }
1281
1282   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1283
1284   unshift(@bind,
1285     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1286       if $extra_bind;
1287   return ($sql, \@bind);
1288 }
1289
1290
1291 sub _fix_bind_params {
1292     my ($self, @bind) = @_;
1293
1294     ### Turn @bind from something like this:
1295     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1296     ### to this:
1297     ###   ( "'1'", "'1'", "'3'" )
1298     return
1299         map {
1300             if ( defined( $_ && $_->[1] ) ) {
1301                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1302             }
1303             else { q{'NULL'}; }
1304         } @bind;
1305 }
1306
1307 sub _query_start {
1308     my ( $self, $sql, @bind ) = @_;
1309
1310     if ( $self->debug ) {
1311         @bind = $self->_fix_bind_params(@bind);
1312
1313         $self->debugobj->query_start( $sql, @bind );
1314     }
1315 }
1316
1317 sub _query_end {
1318     my ( $self, $sql, @bind ) = @_;
1319
1320     if ( $self->debug ) {
1321         @bind = $self->_fix_bind_params(@bind);
1322         $self->debugobj->query_end( $sql, @bind );
1323     }
1324 }
1325
1326 sub _dbh_execute {
1327   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1328
1329   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1330
1331   $self->_query_start( $sql, @$bind );
1332
1333   my $sth = $self->sth($sql,$op);
1334
1335   my $placeholder_index = 1;
1336
1337   foreach my $bound (@$bind) {
1338     my $attributes = {};
1339     my($column_name, @data) = @$bound;
1340
1341     if ($bind_attributes) {
1342       $attributes = $bind_attributes->{$column_name}
1343       if defined $bind_attributes->{$column_name};
1344     }
1345
1346     foreach my $data (@data) {
1347       my $ref = ref $data;
1348       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1349
1350       $sth->bind_param($placeholder_index, $data, $attributes);
1351       $placeholder_index++;
1352     }
1353   }
1354
1355   # Can this fail without throwing an exception anyways???
1356   my $rv = $sth->execute();
1357   $self->throw_exception($sth->errstr) if !$rv;
1358
1359   $self->_query_end( $sql, @$bind );
1360
1361   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1362 }
1363
1364 sub _execute {
1365     my $self = shift;
1366     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1367 }
1368
1369 sub _prefetch_insert_auto_nextvals {
1370   my ($self, $source, $to_insert) = @_;
1371
1372   my $upd = {};
1373
1374   foreach my $col ( $source->columns ) {
1375     if ( !defined $to_insert->{$col} ) {
1376       my $col_info = $source->column_info($col);
1377
1378       if ( $col_info->{auto_nextval} ) {
1379         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1380           'nextval',
1381           $col_info->{sequence} ||=
1382             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1383         );
1384       }
1385     }
1386   }
1387
1388   return $upd;
1389 }
1390
1391 sub insert {
1392   my $self = shift;
1393   my ($source, $to_insert, $opts) = @_;
1394
1395   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1396
1397   my $bind_attributes = $self->source_bind_attributes($source);
1398
1399   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1400
1401   if ($opts->{returning}) {
1402     my @ret_cols = @{$opts->{returning}};
1403
1404     my @ret_vals = eval {
1405       local $SIG{__WARN__} = sub {};
1406       my @r = $sth->fetchrow_array;
1407       $sth->finish;
1408       @r;
1409     };
1410
1411     my %ret;
1412     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1413
1414     $updated_cols = {
1415       %$updated_cols,
1416       %ret,
1417     };
1418   }
1419
1420   return $updated_cols;
1421 }
1422
1423 ## Currently it is assumed that all values passed will be "normal", i.e. not
1424 ## scalar refs, or at least, all the same type as the first set, the statement is
1425 ## only prepped once.
1426 sub insert_bulk {
1427   my ($self, $source, $cols, $data) = @_;
1428
1429   my %colvalues;
1430   @colvalues{@$cols} = (0..$#$cols);
1431
1432   for my $i (0..$#$cols) {
1433     my $first_val = $data->[0][$i];
1434     next unless ref $first_val eq 'SCALAR';
1435
1436     $colvalues{ $cols->[$i] } = $first_val;
1437   }
1438
1439   # check for bad data and stringify stringifiable objects
1440   my $bad_slice = sub {
1441     my ($msg, $col_idx, $slice_idx) = @_;
1442     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1443       $msg,
1444       $cols->[$col_idx],
1445       do {
1446         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1447         Data::Dumper::Concise::Dumper({
1448           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1449         }),
1450       }
1451     );
1452   };
1453
1454   for my $datum_idx (0..$#$data) {
1455     my $datum = $data->[$datum_idx];
1456
1457     for my $col_idx (0..$#$cols) {
1458       my $val            = $datum->[$col_idx];
1459       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1460       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1461
1462       if ($is_literal_sql) {
1463         if (not ref $val) {
1464           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1465         }
1466         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1467           $bad_slice->("$reftype reference found where literal SQL expected",
1468             $col_idx, $datum_idx);
1469         }
1470         elsif ($$val ne $$sqla_bind){
1471           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1472             $col_idx, $datum_idx);
1473         }
1474       }
1475       elsif (my $reftype = ref $val) {
1476         require overload;
1477         if (overload::Method($val, '""')) {
1478           $datum->[$col_idx] = "".$val;
1479         }
1480         else {
1481           $bad_slice->("$reftype reference found where bind expected",
1482             $col_idx, $datum_idx);
1483         }
1484       }
1485     }
1486   }
1487
1488   my ($sql, $bind) = $self->_prep_for_execute (
1489     'insert', undef, $source, [\%colvalues]
1490   );
1491   my @bind = @$bind;
1492
1493   my $empty_bind = 1 if (not @bind) &&
1494     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1495
1496   if ((not @bind) && (not $empty_bind)) {
1497     $self->throw_exception(
1498       'Cannot insert_bulk without support for placeholders'
1499     );
1500   }
1501
1502   # neither _execute_array, nor _execute_inserts_with_no_binds are
1503   # atomic (even if _execute _array is a single call). Thus a safety
1504   # scope guard
1505   my $guard = $self->txn_scope_guard;
1506
1507   $self->_query_start( $sql, ['__BULK__'] );
1508   my $sth = $self->sth($sql);
1509   my $rv = do {
1510     if ($empty_bind) {
1511       # bind_param_array doesn't work if there are no binds
1512       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1513     }
1514     else {
1515 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1516       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1517     }
1518   };
1519
1520   $self->_query_end( $sql, ['__BULK__'] );
1521
1522   $guard->commit;
1523
1524   return (wantarray ? ($rv, $sth, @bind) : $rv);
1525 }
1526
1527 sub _execute_array {
1528   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1529
1530   ## This must be an arrayref, else nothing works!
1531   my $tuple_status = [];
1532
1533   ## Get the bind_attributes, if any exist
1534   my $bind_attributes = $self->source_bind_attributes($source);
1535
1536   ## Bind the values and execute
1537   my $placeholder_index = 1;
1538
1539   foreach my $bound (@$bind) {
1540
1541     my $attributes = {};
1542     my ($column_name, $data_index) = @$bound;
1543
1544     if( $bind_attributes ) {
1545       $attributes = $bind_attributes->{$column_name}
1546       if defined $bind_attributes->{$column_name};
1547     }
1548
1549     my @data = map { $_->[$data_index] } @$data;
1550
1551     $sth->bind_param_array(
1552       $placeholder_index,
1553       [@data],
1554       (%$attributes ?  $attributes : ()),
1555     );
1556     $placeholder_index++;
1557   }
1558
1559   my $rv = eval {
1560     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1561   };
1562   my $err = $@ || $sth->errstr;
1563
1564 # Statement must finish even if there was an exception.
1565   eval { $sth->finish };
1566   $err = $@ unless $err;
1567
1568   if ($err) {
1569     my $i = 0;
1570     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1571
1572     $self->throw_exception("Unexpected populate error: $err")
1573       if ($i > $#$tuple_status);
1574
1575     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1576       ($tuple_status->[$i][1] || $err),
1577       Data::Dumper::Concise::Dumper({
1578         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1579       }),
1580     );
1581   }
1582   return $rv;
1583 }
1584
1585 sub _dbh_execute_array {
1586     my ($self, $sth, $tuple_status, @extra) = @_;
1587
1588     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1589 }
1590
1591 sub _dbh_execute_inserts_with_no_binds {
1592   my ($self, $sth, $count) = @_;
1593
1594   eval {
1595     my $dbh = $self->_get_dbh;
1596     local $dbh->{RaiseError} = 1;
1597     local $dbh->{PrintError} = 0;
1598
1599     $sth->execute foreach 1..$count;
1600   };
1601   my $exception = $@;
1602
1603 # Make sure statement is finished even if there was an exception.
1604   eval { $sth->finish };
1605   $exception = $@ unless $exception;
1606
1607   $self->throw_exception($exception) if $exception;
1608
1609   return $count;
1610 }
1611
1612 sub update {
1613   my ($self, $source, @args) = @_;
1614
1615   my $bind_attrs = $self->source_bind_attributes($source);
1616
1617   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1618 }
1619
1620
1621 sub delete {
1622   my ($self, $source, @args) = @_;
1623
1624   my $bind_attrs = $self->source_bind_attributes($source);
1625
1626   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1627 }
1628
1629 # We were sent here because the $rs contains a complex search
1630 # which will require a subquery to select the correct rows
1631 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1632 #
1633 # Generating a single PK column subquery is trivial and supported
1634 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1635 # Look at _multipk_update_delete()
1636 sub _subq_update_delete {
1637   my $self = shift;
1638   my ($rs, $op, $values) = @_;
1639
1640   my $rsrc = $rs->result_source;
1641
1642   # quick check if we got a sane rs on our hands
1643   my @pcols = $rsrc->_pri_cols;
1644
1645   my $sel = $rs->_resolved_attrs->{select};
1646   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1647
1648   if (
1649       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1650         ne
1651       join ("\x00", sort @$sel )
1652   ) {
1653     $self->throw_exception (
1654       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1655     );
1656   }
1657
1658   if (@pcols == 1) {
1659     return $self->$op (
1660       $rsrc,
1661       $op eq 'update' ? $values : (),
1662       { $pcols[0] => { -in => $rs->as_query } },
1663     );
1664   }
1665
1666   else {
1667     return $self->_multipk_update_delete (@_);
1668   }
1669 }
1670
1671 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1672 # resultset update/delete involving subqueries. So by default resort
1673 # to simple (and inefficient) delete_all style per-row opearations,
1674 # while allowing specific storages to override this with a faster
1675 # implementation.
1676 #
1677 sub _multipk_update_delete {
1678   return shift->_per_row_update_delete (@_);
1679 }
1680
1681 # This is the default loop used to delete/update rows for multi PK
1682 # resultsets, and used by mysql exclusively (because it can't do anything
1683 # else).
1684 #
1685 # We do not use $row->$op style queries, because resultset update/delete
1686 # is not expected to cascade (this is what delete_all/update_all is for).
1687 #
1688 # There should be no race conditions as the entire operation is rolled
1689 # in a transaction.
1690 #
1691 sub _per_row_update_delete {
1692   my $self = shift;
1693   my ($rs, $op, $values) = @_;
1694
1695   my $rsrc = $rs->result_source;
1696   my @pcols = $rsrc->_pri_cols;
1697
1698   my $guard = $self->txn_scope_guard;
1699
1700   # emulate the return value of $sth->execute for non-selects
1701   my $row_cnt = '0E0';
1702
1703   my $subrs_cur = $rs->cursor;
1704   my @all_pk = $subrs_cur->all;
1705   for my $pks ( @all_pk) {
1706
1707     my $cond;
1708     for my $i (0.. $#pcols) {
1709       $cond->{$pcols[$i]} = $pks->[$i];
1710     }
1711
1712     $self->$op (
1713       $rsrc,
1714       $op eq 'update' ? $values : (),
1715       $cond,
1716     );
1717
1718     $row_cnt++;
1719   }
1720
1721   $guard->commit;
1722
1723   return $row_cnt;
1724 }
1725
1726 sub _select {
1727   my $self = shift;
1728
1729   # localization is neccessary as
1730   # 1) there is no infrastructure to pass this around before SQLA2
1731   # 2) _select_args sets it and _prep_for_execute consumes it
1732   my $sql_maker = $self->sql_maker;
1733   local $sql_maker->{_dbic_rs_attrs};
1734
1735   return $self->_execute($self->_select_args(@_));
1736 }
1737
1738 sub _select_args_to_query {
1739   my $self = shift;
1740
1741   # localization is neccessary as
1742   # 1) there is no infrastructure to pass this around before SQLA2
1743   # 2) _select_args sets it and _prep_for_execute consumes it
1744   my $sql_maker = $self->sql_maker;
1745   local $sql_maker->{_dbic_rs_attrs};
1746
1747   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1748   #  = $self->_select_args($ident, $select, $cond, $attrs);
1749   my ($op, $bind, $ident, $bind_attrs, @args) =
1750     $self->_select_args(@_);
1751
1752   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1753   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1754   $prepared_bind ||= [];
1755
1756   return wantarray
1757     ? ($sql, $prepared_bind, $bind_attrs)
1758     : \[ "($sql)", @$prepared_bind ]
1759   ;
1760 }
1761
1762 sub _select_args {
1763   my ($self, $ident, $select, $where, $attrs) = @_;
1764
1765   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1766
1767   my $sql_maker = $self->sql_maker;
1768   $sql_maker->{_dbic_rs_attrs} = {
1769     %$attrs,
1770     select => $select,
1771     from => $ident,
1772     where => $where,
1773     $rs_alias && $alias2source->{$rs_alias}
1774       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1775       : ()
1776     ,
1777   };
1778
1779   # calculate bind_attrs before possible $ident mangling
1780   my $bind_attrs = {};
1781   for my $alias (keys %$alias2source) {
1782     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1783     for my $col (keys %$bindtypes) {
1784
1785       my $fqcn = join ('.', $alias, $col);
1786       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1787
1788       # Unqialified column names are nice, but at the same time can be
1789       # rather ambiguous. What we do here is basically go along with
1790       # the loop, adding an unqualified column slot to $bind_attrs,
1791       # alongside the fully qualified name. As soon as we encounter
1792       # another column by that name (which would imply another table)
1793       # we unset the unqualified slot and never add any info to it
1794       # to avoid erroneous type binding. If this happens the users
1795       # only choice will be to fully qualify his column name
1796
1797       if (exists $bind_attrs->{$col}) {
1798         $bind_attrs->{$col} = {};
1799       }
1800       else {
1801         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1802       }
1803     }
1804   }
1805
1806   # adjust limits
1807   if (
1808     $attrs->{software_limit}
1809       ||
1810     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1811   ) {
1812     $attrs->{software_limit} = 1;
1813   }
1814   else {
1815     $self->throw_exception("rows attribute must be positive if present")
1816       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1817
1818     # MySQL actually recommends this approach.  I cringe.
1819     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1820   }
1821
1822   my @limit;
1823
1824   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1825   # storage, unless software limit was requested
1826   if (
1827     #limited has_many
1828     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1829        ||
1830     # limited prefetch with RNO subqueries
1831     (
1832       $attrs->{rows}
1833         &&
1834       $sql_maker->limit_dialect eq 'RowNumberOver'
1835         &&
1836       $attrs->{_prefetch_select}
1837         &&
1838       @{$attrs->{_prefetch_select}}
1839     )
1840       ||
1841     # grouped prefetch
1842     ( $attrs->{group_by}
1843         &&
1844       @{$attrs->{group_by}}
1845         &&
1846       $attrs->{_prefetch_select}
1847         &&
1848       @{$attrs->{_prefetch_select}}
1849     )
1850   ) {
1851     ($ident, $select, $where, $attrs)
1852       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1853   }
1854
1855   elsif (
1856     ($attrs->{rows} || $attrs->{offset})
1857       &&
1858     $sql_maker->limit_dialect eq 'RowNumberOver'
1859       &&
1860     (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
1861       &&
1862     scalar $self->_parse_order_by ($attrs->{order_by})
1863   ) {
1864     # the RNO limit dialect above mangles the SQL such that the join gets lost
1865     # wrap a subquery here
1866
1867     push @limit, delete @{$attrs}{qw/rows offset/};
1868
1869     my $subq = $self->_select_args_to_query (
1870       $ident,
1871       $select,
1872       $where,
1873       $attrs,
1874     );
1875
1876     $ident = {
1877       -alias => $attrs->{alias},
1878       -source_handle => $ident->[0]{-source_handle},
1879       $attrs->{alias} => $subq,
1880     };
1881
1882     # all part of the subquery now
1883     delete @{$attrs}{qw/order_by group_by having/};
1884     $where = undef;
1885   }
1886
1887   elsif (! $attrs->{software_limit} ) {
1888     push @limit, $attrs->{rows}, $attrs->{offset};
1889   }
1890
1891   # try to simplify the joinmap further (prune unreferenced type-single joins)
1892   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1893
1894 ###
1895   # This would be the point to deflate anything found in $where
1896   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1897   # expect a row object. And all we have is a resultsource (it is trivial
1898   # to extract deflator coderefs via $alias2source above).
1899   #
1900   # I don't see a way forward other than changing the way deflators are
1901   # invoked, and that's just bad...
1902 ###
1903
1904   my $order = { map
1905     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1906     (qw/order_by group_by having/ )
1907   };
1908
1909   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1910 }
1911
1912 # Returns a counting SELECT for a simple count
1913 # query. Abstracted so that a storage could override
1914 # this to { count => 'firstcol' } or whatever makes
1915 # sense as a performance optimization
1916 sub _count_select {
1917   #my ($self, $source, $rs_attrs) = @_;
1918   return { count => '*' };
1919 }
1920
1921 # Returns a SELECT which will end up in the subselect
1922 # There may or may not be a group_by, as the subquery
1923 # might have been called to accomodate a limit
1924 #
1925 # Most databases would be happy with whatever ends up
1926 # here, but some choke in various ways.
1927 #
1928 sub _subq_count_select {
1929   my ($self, $source, $rs_attrs) = @_;
1930
1931   if (my $groupby = $rs_attrs->{group_by}) {
1932
1933     my $avail_columns = $self->_resolve_column_info ($rs_attrs->{from});
1934
1935     my $sel_index;
1936     for my $sel (@{$rs_attrs->{select}}) {
1937       if (ref $sel eq 'HASH' and $sel->{-as}) {
1938         $sel_index->{$sel->{-as}} = $sel;
1939       }
1940     }
1941
1942     my @selection;
1943     for my $g_part (@$groupby) {
1944       if (ref $g_part or $avail_columns->{$g_part}) {
1945         push @selection, $g_part;
1946       }
1947       elsif ($sel_index->{$g_part}) {
1948         push @selection, $sel_index->{$g_part};
1949       }
1950       else {
1951         $self->throw_exception ("group_by criteria '$g_part' not contained within current resultset source(s)");
1952       }
1953     }
1954
1955     return \@selection;
1956   }
1957
1958   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1959   return @pcols ? \@pcols : [ 1 ];
1960 }
1961
1962 sub source_bind_attributes {
1963   my ($self, $source) = @_;
1964
1965   my $bind_attributes;
1966   foreach my $column ($source->columns) {
1967
1968     my $data_type = $source->column_info($column)->{data_type} || '';
1969     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1970      if $data_type;
1971   }
1972
1973   return $bind_attributes;
1974 }
1975
1976 =head2 select
1977
1978 =over 4
1979
1980 =item Arguments: $ident, $select, $condition, $attrs
1981
1982 =back
1983
1984 Handle a SQL select statement.
1985
1986 =cut
1987
1988 sub select {
1989   my $self = shift;
1990   my ($ident, $select, $condition, $attrs) = @_;
1991   return $self->cursor_class->new($self, \@_, $attrs);
1992 }
1993
1994 sub select_single {
1995   my $self = shift;
1996   my ($rv, $sth, @bind) = $self->_select(@_);
1997   my @row = $sth->fetchrow_array;
1998   my @nextrow = $sth->fetchrow_array if @row;
1999   if(@row && @nextrow) {
2000     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2001   }
2002   # Need to call finish() to work round broken DBDs
2003   $sth->finish();
2004   return @row;
2005 }
2006
2007 =head2 sth
2008
2009 =over 4
2010
2011 =item Arguments: $sql
2012
2013 =back
2014
2015 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2016
2017 =cut
2018
2019 sub _dbh_sth {
2020   my ($self, $dbh, $sql) = @_;
2021
2022   # 3 is the if_active parameter which avoids active sth re-use
2023   my $sth = $self->disable_sth_caching
2024     ? $dbh->prepare($sql)
2025     : $dbh->prepare_cached($sql, {}, 3);
2026
2027   # XXX You would think RaiseError would make this impossible,
2028   #  but apparently that's not true :(
2029   $self->throw_exception($dbh->errstr) if !$sth;
2030
2031   $sth;
2032 }
2033
2034 sub sth {
2035   my ($self, $sql) = @_;
2036   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2037 }
2038
2039 sub _dbh_columns_info_for {
2040   my ($self, $dbh, $table) = @_;
2041
2042   if ($dbh->can('column_info')) {
2043     my %result;
2044     eval {
2045       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2046       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2047       $sth->execute();
2048       while ( my $info = $sth->fetchrow_hashref() ){
2049         my %column_info;
2050         $column_info{data_type}   = $info->{TYPE_NAME};
2051         $column_info{size}      = $info->{COLUMN_SIZE};
2052         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2053         $column_info{default_value} = $info->{COLUMN_DEF};
2054         my $col_name = $info->{COLUMN_NAME};
2055         $col_name =~ s/^\"(.*)\"$/$1/;
2056
2057         $result{$col_name} = \%column_info;
2058       }
2059     };
2060     return \%result if !$@ && scalar keys %result;
2061   }
2062
2063   my %result;
2064   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2065   $sth->execute;
2066   my @columns = @{$sth->{NAME_lc}};
2067   for my $i ( 0 .. $#columns ){
2068     my %column_info;
2069     $column_info{data_type} = $sth->{TYPE}->[$i];
2070     $column_info{size} = $sth->{PRECISION}->[$i];
2071     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2072
2073     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2074       $column_info{data_type} = $1;
2075       $column_info{size}    = $2;
2076     }
2077
2078     $result{$columns[$i]} = \%column_info;
2079   }
2080   $sth->finish;
2081
2082   foreach my $col (keys %result) {
2083     my $colinfo = $result{$col};
2084     my $type_num = $colinfo->{data_type};
2085     my $type_name;
2086     if(defined $type_num && $dbh->can('type_info')) {
2087       my $type_info = $dbh->type_info($type_num);
2088       $type_name = $type_info->{TYPE_NAME} if $type_info;
2089       $colinfo->{data_type} = $type_name if $type_name;
2090     }
2091   }
2092
2093   return \%result;
2094 }
2095
2096 sub columns_info_for {
2097   my ($self, $table) = @_;
2098   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2099 }
2100
2101 =head2 last_insert_id
2102
2103 Return the row id of the last insert.
2104
2105 =cut
2106
2107 sub _dbh_last_insert_id {
2108     my ($self, $dbh, $source, $col) = @_;
2109
2110     my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2111
2112     return $id if defined $id;
2113
2114     my $class = ref $self;
2115     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2116 }
2117
2118 sub last_insert_id {
2119   my $self = shift;
2120   $self->_dbh_last_insert_id ($self->_dbh, @_);
2121 }
2122
2123 =head2 _native_data_type
2124
2125 =over 4
2126
2127 =item Arguments: $type_name
2128
2129 =back
2130
2131 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2132 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2133 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2134
2135 The default implementation returns C<undef>, implement in your Storage driver if
2136 you need this functionality.
2137
2138 Should map types from other databases to the native RDBMS type, for example
2139 C<VARCHAR2> to C<VARCHAR>.
2140
2141 Types with modifiers should map to the underlying data type. For example,
2142 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2143
2144 Composite types should map to the container type, for example
2145 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2146
2147 =cut
2148
2149 sub _native_data_type {
2150   #my ($self, $data_type) = @_;
2151   return undef
2152 }
2153
2154 # Check if placeholders are supported at all
2155 sub _placeholders_supported {
2156   my $self = shift;
2157   my $dbh  = $self->_get_dbh;
2158
2159   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2160   # but it is inaccurate more often than not
2161   eval {
2162     local $dbh->{PrintError} = 0;
2163     local $dbh->{RaiseError} = 1;
2164     $dbh->do('select ?', {}, 1);
2165   };
2166   return $@ ? 0 : 1;
2167 }
2168
2169 # Check if placeholders bound to non-string types throw exceptions
2170 #
2171 sub _typeless_placeholders_supported {
2172   my $self = shift;
2173   my $dbh  = $self->_get_dbh;
2174
2175   eval {
2176     local $dbh->{PrintError} = 0;
2177     local $dbh->{RaiseError} = 1;
2178     # this specifically tests a bind that is NOT a string
2179     $dbh->do('select 1 where 1 = ?', {}, 1);
2180   };
2181   return $@ ? 0 : 1;
2182 }
2183
2184 =head2 sqlt_type
2185
2186 Returns the database driver name.
2187
2188 =cut
2189
2190 sub sqlt_type {
2191   shift->_get_dbh->{Driver}->{Name};
2192 }
2193
2194 =head2 bind_attribute_by_data_type
2195
2196 Given a datatype from column info, returns a database specific bind
2197 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2198 let the database planner just handle it.
2199
2200 Generally only needed for special case column types, like bytea in postgres.
2201
2202 =cut
2203
2204 sub bind_attribute_by_data_type {
2205     return;
2206 }
2207
2208 =head2 is_datatype_numeric
2209
2210 Given a datatype from column_info, returns a boolean value indicating if
2211 the current RDBMS considers it a numeric value. This controls how
2212 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2213 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2214 be performed instead of the usual C<eq>.
2215
2216 =cut
2217
2218 sub is_datatype_numeric {
2219   my ($self, $dt) = @_;
2220
2221   return 0 unless $dt;
2222
2223   return $dt =~ /^ (?:
2224     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2225   ) $/ix;
2226 }
2227
2228
2229 =head2 create_ddl_dir
2230
2231 =over 4
2232
2233 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2234
2235 =back
2236
2237 Creates a SQL file based on the Schema, for each of the specified
2238 database engines in C<\@databases> in the given directory.
2239 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2240
2241 Given a previous version number, this will also create a file containing
2242 the ALTER TABLE statements to transform the previous schema into the
2243 current one. Note that these statements may contain C<DROP TABLE> or
2244 C<DROP COLUMN> statements that can potentially destroy data.
2245
2246 The file names are created using the C<ddl_filename> method below, please
2247 override this method in your schema if you would like a different file
2248 name format. For the ALTER file, the same format is used, replacing
2249 $version in the name with "$preversion-$version".
2250
2251 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2252 The most common value for this would be C<< { add_drop_table => 1 } >>
2253 to have the SQL produced include a C<DROP TABLE> statement for each table
2254 created. For quoting purposes supply C<quote_table_names> and
2255 C<quote_field_names>.
2256
2257 If no arguments are passed, then the following default values are assumed:
2258
2259 =over 4
2260
2261 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2262
2263 =item version    - $schema->schema_version
2264
2265 =item directory  - './'
2266
2267 =item preversion - <none>
2268
2269 =back
2270
2271 By default, C<\%sqlt_args> will have
2272
2273  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2274
2275 merged with the hash passed in. To disable any of those features, pass in a
2276 hashref like the following
2277
2278  { ignore_constraint_names => 0, # ... other options }
2279
2280
2281 WARNING: You are strongly advised to check all SQL files created, before applying
2282 them.
2283
2284 =cut
2285
2286 sub create_ddl_dir {
2287   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2288
2289   unless ($dir) {
2290     carp "No directory given, using ./\n";
2291     $dir = './';
2292   }
2293
2294   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2295
2296   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2297   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2298
2299   my $schema_version = $schema->schema_version || '1.x';
2300   $version ||= $schema_version;
2301
2302   $sqltargs = {
2303     add_drop_table => 1,
2304     ignore_constraint_names => 1,
2305     ignore_index_names => 1,
2306     %{$sqltargs || {}}
2307   };
2308
2309   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2310     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2311   }
2312
2313   my $sqlt = SQL::Translator->new( $sqltargs );
2314
2315   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2316   my $sqlt_schema = $sqlt->translate({ data => $schema })
2317     or $self->throw_exception ($sqlt->error);
2318
2319   foreach my $db (@$databases) {
2320     $sqlt->reset();
2321     $sqlt->{schema} = $sqlt_schema;
2322     $sqlt->producer($db);
2323
2324     my $file;
2325     my $filename = $schema->ddl_filename($db, $version, $dir);
2326     if (-e $filename && ($version eq $schema_version )) {
2327       # if we are dumping the current version, overwrite the DDL
2328       carp "Overwriting existing DDL file - $filename";
2329       unlink($filename);
2330     }
2331
2332     my $output = $sqlt->translate;
2333     if(!$output) {
2334       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2335       next;
2336     }
2337     if(!open($file, ">$filename")) {
2338       $self->throw_exception("Can't open $filename for writing ($!)");
2339       next;
2340     }
2341     print $file $output;
2342     close($file);
2343
2344     next unless ($preversion);
2345
2346     require SQL::Translator::Diff;
2347
2348     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2349     if(!-e $prefilename) {
2350       carp("No previous schema file found ($prefilename)");
2351       next;
2352     }
2353
2354     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2355     if(-e $difffile) {
2356       carp("Overwriting existing diff file - $difffile");
2357       unlink($difffile);
2358     }
2359
2360     my $source_schema;
2361     {
2362       my $t = SQL::Translator->new($sqltargs);
2363       $t->debug( 0 );
2364       $t->trace( 0 );
2365
2366       $t->parser( $db )
2367         or $self->throw_exception ($t->error);
2368
2369       my $out = $t->translate( $prefilename )
2370         or $self->throw_exception ($t->error);
2371
2372       $source_schema = $t->schema;
2373
2374       $source_schema->name( $prefilename )
2375         unless ( $source_schema->name );
2376     }
2377
2378     # The "new" style of producers have sane normalization and can support
2379     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2380     # And we have to diff parsed SQL against parsed SQL.
2381     my $dest_schema = $sqlt_schema;
2382
2383     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2384       my $t = SQL::Translator->new($sqltargs);
2385       $t->debug( 0 );
2386       $t->trace( 0 );
2387
2388       $t->parser( $db )
2389         or $self->throw_exception ($t->error);
2390
2391       my $out = $t->translate( $filename )
2392         or $self->throw_exception ($t->error);
2393
2394       $dest_schema = $t->schema;
2395
2396       $dest_schema->name( $filename )
2397         unless $dest_schema->name;
2398     }
2399
2400     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2401                                                   $dest_schema,   $db,
2402                                                   $sqltargs
2403                                                  );
2404     if(!open $file, ">$difffile") {
2405       $self->throw_exception("Can't write to $difffile ($!)");
2406       next;
2407     }
2408     print $file $diff;
2409     close($file);
2410   }
2411 }
2412
2413 =head2 deployment_statements
2414
2415 =over 4
2416
2417 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2418
2419 =back
2420
2421 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2422
2423 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2424 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2425
2426 C<$directory> is used to return statements from files in a previously created
2427 L</create_ddl_dir> directory and is optional. The filenames are constructed
2428 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2429
2430 If no C<$directory> is specified then the statements are constructed on the
2431 fly using L<SQL::Translator> and C<$version> is ignored.
2432
2433 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2434
2435 =cut
2436
2437 sub deployment_statements {
2438   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2439   $type ||= $self->sqlt_type;
2440   $version ||= $schema->schema_version || '1.x';
2441   $dir ||= './';
2442   my $filename = $schema->ddl_filename($type, $version, $dir);
2443   if(-f $filename)
2444   {
2445       my $file;
2446       open($file, "<$filename")
2447         or $self->throw_exception("Can't open $filename ($!)");
2448       my @rows = <$file>;
2449       close($file);
2450       return join('', @rows);
2451   }
2452
2453   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2454     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2455   }
2456
2457   # sources needs to be a parser arg, but for simplicty allow at top level
2458   # coming in
2459   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2460       if exists $sqltargs->{sources};
2461
2462   my $tr = SQL::Translator->new(
2463     producer => "SQL::Translator::Producer::${type}",
2464     %$sqltargs,
2465     parser => 'SQL::Translator::Parser::DBIx::Class',
2466     data => $schema,
2467   );
2468
2469   my @ret;
2470   my $wa = wantarray;
2471   if ($wa) {
2472     @ret = $tr->translate;
2473   }
2474   else {
2475     $ret[0] = $tr->translate;
2476   }
2477
2478   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2479     unless (@ret && defined $ret[0]);
2480
2481   return $wa ? @ret : $ret[0];
2482 }
2483
2484 sub deploy {
2485   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2486   my $deploy = sub {
2487     my $line = shift;
2488     return if($line =~ /^--/);
2489     return if(!$line);
2490     # next if($line =~ /^DROP/m);
2491     return if($line =~ /^BEGIN TRANSACTION/m);
2492     return if($line =~ /^COMMIT/m);
2493     return if $line =~ /^\s+$/; # skip whitespace only
2494     $self->_query_start($line);
2495     eval {
2496       # do a dbh_do cycle here, as we need some error checking in
2497       # place (even though we will ignore errors)
2498       $self->dbh_do (sub { $_[1]->do($line) });
2499     };
2500     if ($@) {
2501       carp qq{$@ (running "${line}")};
2502     }
2503     $self->_query_end($line);
2504   };
2505   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2506   if (@statements > 1) {
2507     foreach my $statement (@statements) {
2508       $deploy->( $statement );
2509     }
2510   }
2511   elsif (@statements == 1) {
2512     foreach my $line ( split(";\n", $statements[0])) {
2513       $deploy->( $line );
2514     }
2515   }
2516 }
2517
2518 =head2 datetime_parser
2519
2520 Returns the datetime parser class
2521
2522 =cut
2523
2524 sub datetime_parser {
2525   my $self = shift;
2526   return $self->{datetime_parser} ||= do {
2527     $self->build_datetime_parser(@_);
2528   };
2529 }
2530
2531 =head2 datetime_parser_type
2532
2533 Defines (returns) the datetime parser class - currently hardwired to
2534 L<DateTime::Format::MySQL>
2535
2536 =cut
2537
2538 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2539
2540 =head2 build_datetime_parser
2541
2542 See L</datetime_parser>
2543
2544 =cut
2545
2546 sub build_datetime_parser {
2547   my $self = shift;
2548   my $type = $self->datetime_parser_type(@_);
2549   $self->ensure_class_loaded ($type);
2550   return $type;
2551 }
2552
2553
2554 =head2 is_replicating
2555
2556 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2557 replicate from a master database.  Default is undef, which is the result
2558 returned by databases that don't support replication.
2559
2560 =cut
2561
2562 sub is_replicating {
2563     return;
2564
2565 }
2566
2567 =head2 lag_behind_master
2568
2569 Returns a number that represents a certain amount of lag behind a master db
2570 when a given storage is replicating.  The number is database dependent, but
2571 starts at zero and increases with the amount of lag. Default in undef
2572
2573 =cut
2574
2575 sub lag_behind_master {
2576     return;
2577 }
2578
2579 =head2 relname_to_table_alias
2580
2581 =over 4
2582
2583 =item Arguments: $relname, $join_count
2584
2585 =back
2586
2587 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2588 queries.
2589
2590 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2591 way these aliases are named.
2592
2593 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2594 otherwise C<"$relname">.
2595
2596 =cut
2597
2598 sub relname_to_table_alias {
2599   my ($self, $relname, $join_count) = @_;
2600
2601   my $alias = ($join_count && $join_count > 1 ?
2602     join('_', $relname, $join_count) : $relname);
2603
2604   return $alias;
2605 }
2606
2607 sub DESTROY {
2608   my $self = shift;
2609
2610   $self->_verify_pid if $self->_dbh;
2611
2612   # some databases need this to stop spewing warnings
2613   if (my $dbh = $self->_dbh) {
2614     local $@;
2615     eval {
2616       %{ $dbh->{CachedKids} } = ();
2617       $dbh->disconnect;
2618     };
2619   }
2620
2621   $self->_dbh(undef);
2622 }
2623
2624 1;
2625
2626 =head1 USAGE NOTES
2627
2628 =head2 DBIx::Class and AutoCommit
2629
2630 DBIx::Class can do some wonderful magic with handling exceptions,
2631 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2632 (the default) combined with C<txn_do> for transaction support.
2633
2634 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2635 in an assumed transaction between commits, and you're telling us you'd
2636 like to manage that manually.  A lot of the magic protections offered by
2637 this module will go away.  We can't protect you from exceptions due to database
2638 disconnects because we don't know anything about how to restart your
2639 transactions.  You're on your own for handling all sorts of exceptional
2640 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2641 be with raw DBI.
2642
2643
2644 =head1 AUTHORS
2645
2646 Matt S. Trout <mst@shadowcatsystems.co.uk>
2647
2648 Andy Grundman <andy@hybridized.org>
2649
2650 =head1 LICENSE
2651
2652 You may distribute this code under the same terms as Perl itself.
2653
2654 =cut