Refactor the version handling
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18
19 __PACKAGE__->mk_group_accessors('simple' =>
20   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
21      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints
22      _server_info_hash/
23 );
24
25 # the values for these accessors are picked out (and deleted) from
26 # the attribute hashref passed to connect_info
27 my @storage_options = qw/
28   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
29   disable_sth_caching unsafe auto_savepoint
30 /;
31 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
32
33
34 # default cursor class, overridable in connect_info attributes
35 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
36
37 __PACKAGE__->mk_group_accessors('inherited' => qw/
38   sql_maker_class
39   _supports_insert_returning
40 /);
41 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
42
43
44 # Each of these methods need _determine_driver called before itself
45 # in order to function reliably. This is a purely DRY optimization
46 my @rdbms_specific_methods = qw/
47   deployment_statements
48   sqlt_type
49   build_datetime_parser
50   datetime_parser_type
51
52   insert
53   insert_bulk
54   update
55   delete
56   select
57   select_single
58 /;
59
60 for my $meth (@rdbms_specific_methods) {
61
62   my $orig = __PACKAGE__->can ($meth)
63     or next;
64
65   no strict qw/refs/;
66   no warnings qw/redefine/;
67   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
68     if (not $_[0]->_driver_determined) {
69       $_[0]->_determine_driver;
70       goto $_[0]->can($meth);
71     }
72     $orig->(@_);
73   };
74 }
75
76
77 =head1 NAME
78
79 DBIx::Class::Storage::DBI - DBI storage handler
80
81 =head1 SYNOPSIS
82
83   my $schema = MySchema->connect('dbi:SQLite:my.db');
84
85   $schema->storage->debug(1);
86
87   my @stuff = $schema->storage->dbh_do(
88     sub {
89       my ($storage, $dbh, @args) = @_;
90       $dbh->do("DROP TABLE authors");
91     },
92     @column_list
93   );
94
95   $schema->resultset('Book')->search({
96      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
97   });
98
99 =head1 DESCRIPTION
100
101 This class represents the connection to an RDBMS via L<DBI>.  See
102 L<DBIx::Class::Storage> for general information.  This pod only
103 documents DBI-specific methods and behaviors.
104
105 =head1 METHODS
106
107 =cut
108
109 sub new {
110   my $new = shift->next::method(@_);
111
112   $new->transaction_depth(0);
113   $new->_sql_maker_opts({});
114   $new->{savepoints} = [];
115   $new->{_in_dbh_do} = 0;
116   $new->{_dbh_gen} = 0;
117
118   $new;
119 }
120
121 =head2 connect_info
122
123 This method is normally called by L<DBIx::Class::Schema/connection>, which
124 encapsulates its argument list in an arrayref before passing them here.
125
126 The argument list may contain:
127
128 =over
129
130 =item *
131
132 The same 4-element argument set one would normally pass to
133 L<DBI/connect>, optionally followed by
134 L<extra attributes|/DBIx::Class specific connection attributes>
135 recognized by DBIx::Class:
136
137   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
138
139 =item *
140
141 A single code reference which returns a connected
142 L<DBI database handle|DBI/connect> optionally followed by
143 L<extra attributes|/DBIx::Class specific connection attributes> recognized
144 by DBIx::Class:
145
146   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
147
148 =item *
149
150 A single hashref with all the attributes and the dsn/user/password
151 mixed together:
152
153   $connect_info_args = [{
154     dsn => $dsn,
155     user => $user,
156     password => $pass,
157     %dbi_attributes,
158     %extra_attributes,
159   }];
160
161   $connect_info_args = [{
162     dbh_maker => sub { DBI->connect (...) },
163     %dbi_attributes,
164     %extra_attributes,
165   }];
166
167 This is particularly useful for L<Catalyst> based applications, allowing the
168 following config (L<Config::General> style):
169
170   <Model::DB>
171     schema_class   App::DB
172     <connect_info>
173       dsn          dbi:mysql:database=test
174       user         testuser
175       password     TestPass
176       AutoCommit   1
177     </connect_info>
178   </Model::DB>
179
180 The C<dsn>/C<user>/C<password> combination can be substituted by the
181 C<dbh_maker> key whose value is a coderef that returns a connected
182 L<DBI database handle|DBI/connect>
183
184 =back
185
186 Please note that the L<DBI> docs recommend that you always explicitly
187 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
188 recommends that it be set to I<1>, and that you perform transactions
189 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
190 to I<1> if you do not do explicitly set it to zero.  This is the default
191 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
192
193 =head3 DBIx::Class specific connection attributes
194
195 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
196 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
197 the following connection options. These options can be mixed in with your other
198 L<DBI> connection attributes, or placed in a separate hashref
199 (C<\%extra_attributes>) as shown above.
200
201 Every time C<connect_info> is invoked, any previous settings for
202 these options will be cleared before setting the new ones, regardless of
203 whether any options are specified in the new C<connect_info>.
204
205
206 =over
207
208 =item on_connect_do
209
210 Specifies things to do immediately after connecting or re-connecting to
211 the database.  Its value may contain:
212
213 =over
214
215 =item a scalar
216
217 This contains one SQL statement to execute.
218
219 =item an array reference
220
221 This contains SQL statements to execute in order.  Each element contains
222 a string or a code reference that returns a string.
223
224 =item a code reference
225
226 This contains some code to execute.  Unlike code references within an
227 array reference, its return value is ignored.
228
229 =back
230
231 =item on_disconnect_do
232
233 Takes arguments in the same form as L</on_connect_do> and executes them
234 immediately before disconnecting from the database.
235
236 Note, this only runs if you explicitly call L</disconnect> on the
237 storage object.
238
239 =item on_connect_call
240
241 A more generalized form of L</on_connect_do> that calls the specified
242 C<connect_call_METHOD> methods in your storage driver.
243
244   on_connect_do => 'select 1'
245
246 is equivalent to:
247
248   on_connect_call => [ [ do_sql => 'select 1' ] ]
249
250 Its values may contain:
251
252 =over
253
254 =item a scalar
255
256 Will call the C<connect_call_METHOD> method.
257
258 =item a code reference
259
260 Will execute C<< $code->($storage) >>
261
262 =item an array reference
263
264 Each value can be a method name or code reference.
265
266 =item an array of arrays
267
268 For each array, the first item is taken to be the C<connect_call_> method name
269 or code reference, and the rest are parameters to it.
270
271 =back
272
273 Some predefined storage methods you may use:
274
275 =over
276
277 =item do_sql
278
279 Executes a SQL string or a code reference that returns a SQL string. This is
280 what L</on_connect_do> and L</on_disconnect_do> use.
281
282 It can take:
283
284 =over
285
286 =item a scalar
287
288 Will execute the scalar as SQL.
289
290 =item an arrayref
291
292 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
293 attributes hashref and bind values.
294
295 =item a code reference
296
297 Will execute C<< $code->($storage) >> and execute the return array refs as
298 above.
299
300 =back
301
302 =item datetime_setup
303
304 Execute any statements necessary to initialize the database session to return
305 and accept datetime/timestamp values used with
306 L<DBIx::Class::InflateColumn::DateTime>.
307
308 Only necessary for some databases, see your specific storage driver for
309 implementation details.
310
311 =back
312
313 =item on_disconnect_call
314
315 Takes arguments in the same form as L</on_connect_call> and executes them
316 immediately before disconnecting from the database.
317
318 Calls the C<disconnect_call_METHOD> methods as opposed to the
319 C<connect_call_METHOD> methods called by L</on_connect_call>.
320
321 Note, this only runs if you explicitly call L</disconnect> on the
322 storage object.
323
324 =item disable_sth_caching
325
326 If set to a true value, this option will disable the caching of
327 statement handles via L<DBI/prepare_cached>.
328
329 =item limit_dialect
330
331 Sets the limit dialect. This is useful for JDBC-bridge among others
332 where the remote SQL-dialect cannot be determined by the name of the
333 driver alone. See also L<SQL::Abstract::Limit>.
334
335 =item quote_char
336
337 Specifies what characters to use to quote table and column names. If
338 you use this you will want to specify L</name_sep> as well.
339
340 C<quote_char> expects either a single character, in which case is it
341 is placed on either side of the table/column name, or an arrayref of length
342 2 in which case the table/column name is placed between the elements.
343
344 For example under MySQL you should use C<< quote_char => '`' >>, and for
345 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
346
347 =item name_sep
348
349 This only needs to be used in conjunction with C<quote_char>, and is used to
350 specify the character that separates elements (schemas, tables, columns) from
351 each other. In most cases this is simply a C<.>.
352
353 The consequences of not supplying this value is that L<SQL::Abstract>
354 will assume DBIx::Class' uses of aliases to be complete column
355 names. The output will look like I<"me.name"> when it should actually
356 be I<"me"."name">.
357
358 =item unsafe
359
360 This Storage driver normally installs its own C<HandleError>, sets
361 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
362 all database handles, including those supplied by a coderef.  It does this
363 so that it can have consistent and useful error behavior.
364
365 If you set this option to a true value, Storage will not do its usual
366 modifications to the database handle's attributes, and instead relies on
367 the settings in your connect_info DBI options (or the values you set in
368 your connection coderef, in the case that you are connecting via coderef).
369
370 Note that your custom settings can cause Storage to malfunction,
371 especially if you set a C<HandleError> handler that suppresses exceptions
372 and/or disable C<RaiseError>.
373
374 =item auto_savepoint
375
376 If this option is true, L<DBIx::Class> will use savepoints when nesting
377 transactions, making it possible to recover from failure in the inner
378 transaction without having to abort all outer transactions.
379
380 =item cursor_class
381
382 Use this argument to supply a cursor class other than the default
383 L<DBIx::Class::Storage::DBI::Cursor>.
384
385 =back
386
387 Some real-life examples of arguments to L</connect_info> and
388 L<DBIx::Class::Schema/connect>
389
390   # Simple SQLite connection
391   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
392
393   # Connect via subref
394   ->connect_info([ sub { DBI->connect(...) } ]);
395
396   # Connect via subref in hashref
397   ->connect_info([{
398     dbh_maker => sub { DBI->connect(...) },
399     on_connect_do => 'alter session ...',
400   }]);
401
402   # A bit more complicated
403   ->connect_info(
404     [
405       'dbi:Pg:dbname=foo',
406       'postgres',
407       'my_pg_password',
408       { AutoCommit => 1 },
409       { quote_char => q{"}, name_sep => q{.} },
410     ]
411   );
412
413   # Equivalent to the previous example
414   ->connect_info(
415     [
416       'dbi:Pg:dbname=foo',
417       'postgres',
418       'my_pg_password',
419       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
420     ]
421   );
422
423   # Same, but with hashref as argument
424   # See parse_connect_info for explanation
425   ->connect_info(
426     [{
427       dsn         => 'dbi:Pg:dbname=foo',
428       user        => 'postgres',
429       password    => 'my_pg_password',
430       AutoCommit  => 1,
431       quote_char  => q{"},
432       name_sep    => q{.},
433     }]
434   );
435
436   # Subref + DBIx::Class-specific connection options
437   ->connect_info(
438     [
439       sub { DBI->connect(...) },
440       {
441           quote_char => q{`},
442           name_sep => q{@},
443           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
444           disable_sth_caching => 1,
445       },
446     ]
447   );
448
449
450
451 =cut
452
453 sub connect_info {
454   my ($self, $info) = @_;
455
456   return $self->_connect_info if !$info;
457
458   $self->_connect_info($info); # copy for _connect_info
459
460   $info = $self->_normalize_connect_info($info)
461     if ref $info eq 'ARRAY';
462
463   for my $storage_opt (keys %{ $info->{storage_options} }) {
464     my $value = $info->{storage_options}{$storage_opt};
465
466     $self->$storage_opt($value);
467   }
468
469   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
470   #  the new set of options
471   $self->_sql_maker(undef);
472   $self->_sql_maker_opts({});
473
474   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
475     my $value = $info->{sql_maker_options}{$sql_maker_opt};
476
477     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
478   }
479
480   my %attrs = (
481     %{ $self->_default_dbi_connect_attributes || {} },
482     %{ $info->{attributes} || {} },
483   );
484
485   my @args = @{ $info->{arguments} };
486
487   $self->_dbi_connect_info([@args,
488     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
489
490   return $self->_connect_info;
491 }
492
493 sub _normalize_connect_info {
494   my ($self, $info_arg) = @_;
495   my %info;
496
497   my @args = @$info_arg;  # take a shallow copy for further mutilation
498
499   # combine/pre-parse arguments depending on invocation style
500
501   my %attrs;
502   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
503     %attrs = %{ $args[1] || {} };
504     @args = $args[0];
505   }
506   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
507     %attrs = %{$args[0]};
508     @args = ();
509     if (my $code = delete $attrs{dbh_maker}) {
510       @args = $code;
511
512       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
513       if (@ignored) {
514         carp sprintf (
515             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
516           . "to the result of 'dbh_maker'",
517
518           join (', ', map { "'$_'" } (@ignored) ),
519         );
520       }
521     }
522     else {
523       @args = delete @attrs{qw/dsn user password/};
524     }
525   }
526   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
527     %attrs = (
528       % { $args[3] || {} },
529       % { $args[4] || {} },
530     );
531     @args = @args[0,1,2];
532   }
533
534   $info{arguments} = \@args;
535
536   my @storage_opts = grep exists $attrs{$_},
537     @storage_options, 'cursor_class';
538
539   @{ $info{storage_options} }{@storage_opts} =
540     delete @attrs{@storage_opts} if @storage_opts;
541
542   my @sql_maker_opts = grep exists $attrs{$_},
543     qw/limit_dialect quote_char name_sep/;
544
545   @{ $info{sql_maker_options} }{@sql_maker_opts} =
546     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
547
548   $info{attributes} = \%attrs if %attrs;
549
550   return \%info;
551 }
552
553 sub _default_dbi_connect_attributes {
554   return {
555     AutoCommit => 1,
556     RaiseError => 1,
557     PrintError => 0,
558   };
559 }
560
561 =head2 on_connect_do
562
563 This method is deprecated in favour of setting via L</connect_info>.
564
565 =cut
566
567 =head2 on_disconnect_do
568
569 This method is deprecated in favour of setting via L</connect_info>.
570
571 =cut
572
573 sub _parse_connect_do {
574   my ($self, $type) = @_;
575
576   my $val = $self->$type;
577   return () if not defined $val;
578
579   my @res;
580
581   if (not ref($val)) {
582     push @res, [ 'do_sql', $val ];
583   } elsif (ref($val) eq 'CODE') {
584     push @res, $val;
585   } elsif (ref($val) eq 'ARRAY') {
586     push @res, map { [ 'do_sql', $_ ] } @$val;
587   } else {
588     $self->throw_exception("Invalid type for $type: ".ref($val));
589   }
590
591   return \@res;
592 }
593
594 =head2 dbh_do
595
596 Arguments: ($subref | $method_name), @extra_coderef_args?
597
598 Execute the given $subref or $method_name using the new exception-based
599 connection management.
600
601 The first two arguments will be the storage object that C<dbh_do> was called
602 on and a database handle to use.  Any additional arguments will be passed
603 verbatim to the called subref as arguments 2 and onwards.
604
605 Using this (instead of $self->_dbh or $self->dbh) ensures correct
606 exception handling and reconnection (or failover in future subclasses).
607
608 Your subref should have no side-effects outside of the database, as
609 there is the potential for your subref to be partially double-executed
610 if the database connection was stale/dysfunctional.
611
612 Example:
613
614   my @stuff = $schema->storage->dbh_do(
615     sub {
616       my ($storage, $dbh, @cols) = @_;
617       my $cols = join(q{, }, @cols);
618       $dbh->selectrow_array("SELECT $cols FROM foo");
619     },
620     @column_list
621   );
622
623 =cut
624
625 sub dbh_do {
626   my $self = shift;
627   my $code = shift;
628
629   my $dbh = $self->_get_dbh;
630
631   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
632       || $self->{transaction_depth};
633
634   local $self->{_in_dbh_do} = 1;
635
636   my @result;
637   my $want_array = wantarray;
638
639   eval {
640
641     if($want_array) {
642         @result = $self->$code($dbh, @_);
643     }
644     elsif(defined $want_array) {
645         $result[0] = $self->$code($dbh, @_);
646     }
647     else {
648         $self->$code($dbh, @_);
649     }
650   };
651
652   # ->connected might unset $@ - copy
653   my $exception = $@;
654   if(!$exception) { return $want_array ? @result : $result[0] }
655
656   $self->throw_exception($exception) if $self->connected;
657
658   # We were not connected - reconnect and retry, but let any
659   #  exception fall right through this time
660   carp "Retrying $code after catching disconnected exception: $exception"
661     if $ENV{DBIC_DBIRETRY_DEBUG};
662   $self->_populate_dbh;
663   $self->$code($self->_dbh, @_);
664 }
665
666 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
667 # It also informs dbh_do to bypass itself while under the direction of txn_do,
668 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
669 sub txn_do {
670   my $self = shift;
671   my $coderef = shift;
672
673   ref $coderef eq 'CODE' or $self->throw_exception
674     ('$coderef must be a CODE reference');
675
676   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
677
678   local $self->{_in_dbh_do} = 1;
679
680   my @result;
681   my $want_array = wantarray;
682
683   my $tried = 0;
684   while(1) {
685     eval {
686       $self->_get_dbh;
687
688       $self->txn_begin;
689       if($want_array) {
690           @result = $coderef->(@_);
691       }
692       elsif(defined $want_array) {
693           $result[0] = $coderef->(@_);
694       }
695       else {
696           $coderef->(@_);
697       }
698       $self->txn_commit;
699     };
700
701     # ->connected might unset $@ - copy
702     my $exception = $@;
703     if(!$exception) { return $want_array ? @result : $result[0] }
704
705     if($tried++ || $self->connected) {
706       eval { $self->txn_rollback };
707       my $rollback_exception = $@;
708       if($rollback_exception) {
709         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
710         $self->throw_exception($exception)  # propagate nested rollback
711           if $rollback_exception =~ /$exception_class/;
712
713         $self->throw_exception(
714           "Transaction aborted: ${exception}. "
715           . "Rollback failed: ${rollback_exception}"
716         );
717       }
718       $self->throw_exception($exception)
719     }
720
721     # We were not connected, and was first try - reconnect and retry
722     # via the while loop
723     carp "Retrying $coderef after catching disconnected exception: $exception"
724       if $ENV{DBIC_DBIRETRY_DEBUG};
725     $self->_populate_dbh;
726   }
727 }
728
729 =head2 disconnect
730
731 Our C<disconnect> method also performs a rollback first if the
732 database is not in C<AutoCommit> mode.
733
734 =cut
735
736 sub disconnect {
737   my ($self) = @_;
738
739   if( $self->_dbh ) {
740     my @actions;
741
742     push @actions, ( $self->on_disconnect_call || () );
743     push @actions, $self->_parse_connect_do ('on_disconnect_do');
744
745     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
746
747     $self->_dbh_rollback unless $self->_dbh_autocommit;
748
749     %{ $self->_dbh->{CachedKids} } = ();
750     $self->_dbh->disconnect;
751     $self->_dbh(undef);
752     $self->{_dbh_gen}++;
753   }
754 }
755
756 =head2 with_deferred_fk_checks
757
758 =over 4
759
760 =item Arguments: C<$coderef>
761
762 =item Return Value: The return value of $coderef
763
764 =back
765
766 Storage specific method to run the code ref with FK checks deferred or
767 in MySQL's case disabled entirely.
768
769 =cut
770
771 # Storage subclasses should override this
772 sub with_deferred_fk_checks {
773   my ($self, $sub) = @_;
774   $sub->();
775 }
776
777 =head2 connected
778
779 =over
780
781 =item Arguments: none
782
783 =item Return Value: 1|0
784
785 =back
786
787 Verifies that the current database handle is active and ready to execute
788 an SQL statement (e.g. the connection did not get stale, server is still
789 answering, etc.) This method is used internally by L</dbh>.
790
791 =cut
792
793 sub connected {
794   my $self = shift;
795   return 0 unless $self->_seems_connected;
796
797   #be on the safe side
798   local $self->_dbh->{RaiseError} = 1;
799
800   return $self->_ping;
801 }
802
803 sub _seems_connected {
804   my $self = shift;
805
806   my $dbh = $self->_dbh
807     or return 0;
808
809   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
810     $self->_dbh(undef);
811     $self->{_dbh_gen}++;
812     return 0;
813   }
814   else {
815     $self->_verify_pid;
816     return 0 if !$self->_dbh;
817   }
818
819   return $dbh->FETCH('Active');
820 }
821
822 sub _ping {
823   my $self = shift;
824
825   my $dbh = $self->_dbh or return 0;
826
827   return $dbh->ping;
828 }
829
830 # handle pid changes correctly
831 #  NOTE: assumes $self->_dbh is a valid $dbh
832 sub _verify_pid {
833   my ($self) = @_;
834
835   return if defined $self->_conn_pid && $self->_conn_pid == $$;
836
837   $self->_dbh->{InactiveDestroy} = 1;
838   $self->_dbh(undef);
839   $self->{_dbh_gen}++;
840
841   return;
842 }
843
844 sub ensure_connected {
845   my ($self) = @_;
846
847   unless ($self->connected) {
848     $self->_populate_dbh;
849   }
850 }
851
852 =head2 dbh
853
854 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
855 is guaranteed to be healthy by implicitly calling L</connected>, and if
856 necessary performing a reconnection before returning. Keep in mind that this
857 is very B<expensive> on some database engines. Consider using L</dbh_do>
858 instead.
859
860 =cut
861
862 sub dbh {
863   my ($self) = @_;
864
865   if (not $self->_dbh) {
866     $self->_populate_dbh;
867   } else {
868     $self->ensure_connected;
869   }
870   return $self->_dbh;
871 }
872
873 # this is the internal "get dbh or connect (don't check)" method
874 sub _get_dbh {
875   my $self = shift;
876   $self->_verify_pid if $self->_dbh;
877   $self->_populate_dbh unless $self->_dbh;
878   return $self->_dbh;
879 }
880
881 sub _sql_maker_args {
882     my ($self) = @_;
883
884     return (
885       bindtype=>'columns',
886       array_datatypes => 1,
887       limit_dialect => $self->_get_dbh,
888       %{$self->_sql_maker_opts}
889     );
890 }
891
892 sub sql_maker {
893   my ($self) = @_;
894   unless ($self->_sql_maker) {
895     my $sql_maker_class = $self->sql_maker_class;
896     $self->ensure_class_loaded ($sql_maker_class);
897     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
898   }
899   return $self->_sql_maker;
900 }
901
902 # nothing to do by default
903 sub _rebless {}
904 sub _init {}
905
906 sub _populate_dbh {
907   my ($self) = @_;
908
909   my @info = @{$self->_dbi_connect_info || []};
910   $self->_dbh(undef); # in case ->connected failed we might get sent here
911   $self->_server_info_hash (undef);
912   $self->_dbh($self->_connect(@info));
913
914   $self->_conn_pid($$);
915   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
916
917   $self->_determine_driver;
918
919   # Always set the transaction depth on connect, since
920   #  there is no transaction in progress by definition
921   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
922
923   $self->_run_connection_actions unless $self->{_in_determine_driver};
924 }
925
926 sub _run_connection_actions {
927   my $self = shift;
928   my @actions;
929
930   push @actions, ( $self->on_connect_call || () );
931   push @actions, $self->_parse_connect_do ('on_connect_do');
932
933   $self->_do_connection_actions(connect_call_ => $_) for @actions;
934 }
935
936 sub _server_info {
937   my $self = shift;
938
939   unless ($self->_server_info_hash) {
940
941     my %info;
942
943     my $server_version = $self->_get_server_version;
944
945     if (defined $server_version) {
946       $info{dbms_version} = $server_version;
947
948       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
949       my @verparts = split (/\./, $numeric_version);
950       if (
951         @verparts
952           &&
953         @verparts <= 3
954           &&
955         ! grep { $_ > 999 } (@verparts)
956       ) {
957         $info{normalized_dbms_version} = sprintf "%d.%03d%03d", @verparts;
958       }
959     }
960
961     $self->_server_info_hash(\%info);
962   }
963
964   return $self->_server_info_hash
965 }
966
967 sub _get_server_version {
968   eval { shift->_get_dbh->get_info(18) };
969 }
970
971 sub _determine_driver {
972   my ($self) = @_;
973
974   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
975     my $started_connected = 0;
976     local $self->{_in_determine_driver} = 1;
977
978     if (ref($self) eq __PACKAGE__) {
979       my $driver;
980       if ($self->_dbh) { # we are connected
981         $driver = $self->_dbh->{Driver}{Name};
982         $started_connected = 1;
983       } else {
984         # if connect_info is a CODEREF, we have no choice but to connect
985         if (ref $self->_dbi_connect_info->[0] &&
986             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
987           $self->_populate_dbh;
988           $driver = $self->_dbh->{Driver}{Name};
989         }
990         else {
991           # try to use dsn to not require being connected, the driver may still
992           # force a connection in _rebless to determine version
993           # (dsn may not be supplied at all if all we do is make a mock-schema)
994           my $dsn = $self->_dbi_connect_info->[0] || '';
995           ($driver) = $dsn =~ /dbi:([^:]+):/i;
996         }
997       }
998
999       if ($driver) {
1000         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1001         if ($self->load_optional_class($storage_class)) {
1002           mro::set_mro($storage_class, 'c3');
1003           bless $self, $storage_class;
1004           $self->_rebless();
1005         }
1006       }
1007     }
1008
1009     $self->_driver_determined(1);
1010
1011     $self->_init; # run driver-specific initializations
1012
1013     $self->_run_connection_actions
1014         if !$started_connected && defined $self->_dbh;
1015   }
1016 }
1017
1018 sub _do_connection_actions {
1019   my $self          = shift;
1020   my $method_prefix = shift;
1021   my $call          = shift;
1022
1023   if (not ref($call)) {
1024     my $method = $method_prefix . $call;
1025     $self->$method(@_);
1026   } elsif (ref($call) eq 'CODE') {
1027     $self->$call(@_);
1028   } elsif (ref($call) eq 'ARRAY') {
1029     if (ref($call->[0]) ne 'ARRAY') {
1030       $self->_do_connection_actions($method_prefix, $_) for @$call;
1031     } else {
1032       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1033     }
1034   } else {
1035     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1036   }
1037
1038   return $self;
1039 }
1040
1041 sub connect_call_do_sql {
1042   my $self = shift;
1043   $self->_do_query(@_);
1044 }
1045
1046 sub disconnect_call_do_sql {
1047   my $self = shift;
1048   $self->_do_query(@_);
1049 }
1050
1051 # override in db-specific backend when necessary
1052 sub connect_call_datetime_setup { 1 }
1053
1054 sub _do_query {
1055   my ($self, $action) = @_;
1056
1057   if (ref $action eq 'CODE') {
1058     $action = $action->($self);
1059     $self->_do_query($_) foreach @$action;
1060   }
1061   else {
1062     # Most debuggers expect ($sql, @bind), so we need to exclude
1063     # the attribute hash which is the second argument to $dbh->do
1064     # furthermore the bind values are usually to be presented
1065     # as named arrayref pairs, so wrap those here too
1066     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1067     my $sql = shift @do_args;
1068     my $attrs = shift @do_args;
1069     my @bind = map { [ undef, $_ ] } @do_args;
1070
1071     $self->_query_start($sql, @bind);
1072     $self->_get_dbh->do($sql, $attrs, @do_args);
1073     $self->_query_end($sql, @bind);
1074   }
1075
1076   return $self;
1077 }
1078
1079 sub _connect {
1080   my ($self, @info) = @_;
1081
1082   $self->throw_exception("You failed to provide any connection info")
1083     if !@info;
1084
1085   my ($old_connect_via, $dbh);
1086
1087   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1088     $old_connect_via = $DBI::connect_via;
1089     $DBI::connect_via = 'connect';
1090   }
1091
1092   eval {
1093     if(ref $info[0] eq 'CODE') {
1094        $dbh = $info[0]->();
1095     }
1096     else {
1097        $dbh = DBI->connect(@info);
1098     }
1099
1100     if($dbh && !$self->unsafe) {
1101       my $weak_self = $self;
1102       Scalar::Util::weaken($weak_self);
1103       $dbh->{HandleError} = sub {
1104           if ($weak_self) {
1105             $weak_self->throw_exception("DBI Exception: $_[0]");
1106           }
1107           else {
1108             # the handler may be invoked by something totally out of
1109             # the scope of DBIC
1110             croak ("DBI Exception: $_[0]");
1111           }
1112       };
1113       $dbh->{ShowErrorStatement} = 1;
1114       $dbh->{RaiseError} = 1;
1115       $dbh->{PrintError} = 0;
1116     }
1117   };
1118
1119   $DBI::connect_via = $old_connect_via if $old_connect_via;
1120
1121   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1122     if !$dbh || $@;
1123
1124   $self->_dbh_autocommit($dbh->{AutoCommit});
1125
1126   $dbh;
1127 }
1128
1129 sub svp_begin {
1130   my ($self, $name) = @_;
1131
1132   $name = $self->_svp_generate_name
1133     unless defined $name;
1134
1135   $self->throw_exception ("You can't use savepoints outside a transaction")
1136     if $self->{transaction_depth} == 0;
1137
1138   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1139     unless $self->can('_svp_begin');
1140
1141   push @{ $self->{savepoints} }, $name;
1142
1143   $self->debugobj->svp_begin($name) if $self->debug;
1144
1145   return $self->_svp_begin($name);
1146 }
1147
1148 sub svp_release {
1149   my ($self, $name) = @_;
1150
1151   $self->throw_exception ("You can't use savepoints outside a transaction")
1152     if $self->{transaction_depth} == 0;
1153
1154   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1155     unless $self->can('_svp_release');
1156
1157   if (defined $name) {
1158     $self->throw_exception ("Savepoint '$name' does not exist")
1159       unless grep { $_ eq $name } @{ $self->{savepoints} };
1160
1161     # Dig through the stack until we find the one we are releasing.  This keeps
1162     # the stack up to date.
1163     my $svp;
1164
1165     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1166   } else {
1167     $name = pop @{ $self->{savepoints} };
1168   }
1169
1170   $self->debugobj->svp_release($name) if $self->debug;
1171
1172   return $self->_svp_release($name);
1173 }
1174
1175 sub svp_rollback {
1176   my ($self, $name) = @_;
1177
1178   $self->throw_exception ("You can't use savepoints outside a transaction")
1179     if $self->{transaction_depth} == 0;
1180
1181   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1182     unless $self->can('_svp_rollback');
1183
1184   if (defined $name) {
1185       # If they passed us a name, verify that it exists in the stack
1186       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1187           $self->throw_exception("Savepoint '$name' does not exist!");
1188       }
1189
1190       # Dig through the stack until we find the one we are releasing.  This keeps
1191       # the stack up to date.
1192       while(my $s = pop(@{ $self->{savepoints} })) {
1193           last if($s eq $name);
1194       }
1195       # Add the savepoint back to the stack, as a rollback doesn't remove the
1196       # named savepoint, only everything after it.
1197       push(@{ $self->{savepoints} }, $name);
1198   } else {
1199       # We'll assume they want to rollback to the last savepoint
1200       $name = $self->{savepoints}->[-1];
1201   }
1202
1203   $self->debugobj->svp_rollback($name) if $self->debug;
1204
1205   return $self->_svp_rollback($name);
1206 }
1207
1208 sub _svp_generate_name {
1209     my ($self) = @_;
1210
1211     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1212 }
1213
1214 sub txn_begin {
1215   my $self = shift;
1216
1217   # this means we have not yet connected and do not know the AC status
1218   # (e.g. coderef $dbh)
1219   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1220
1221   if($self->{transaction_depth} == 0) {
1222     $self->debugobj->txn_begin()
1223       if $self->debug;
1224     $self->_dbh_begin_work;
1225   }
1226   elsif ($self->auto_savepoint) {
1227     $self->svp_begin;
1228   }
1229   $self->{transaction_depth}++;
1230 }
1231
1232 sub _dbh_begin_work {
1233   my $self = shift;
1234
1235   # if the user is utilizing txn_do - good for him, otherwise we need to
1236   # ensure that the $dbh is healthy on BEGIN.
1237   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1238   # will be replaced by a failure of begin_work itself (which will be
1239   # then retried on reconnect)
1240   if ($self->{_in_dbh_do}) {
1241     $self->_dbh->begin_work;
1242   } else {
1243     $self->dbh_do(sub { $_[1]->begin_work });
1244   }
1245 }
1246
1247 sub txn_commit {
1248   my $self = shift;
1249   if ($self->{transaction_depth} == 1) {
1250     $self->debugobj->txn_commit()
1251       if ($self->debug);
1252     $self->_dbh_commit;
1253     $self->{transaction_depth} = 0
1254       if $self->_dbh_autocommit;
1255   }
1256   elsif($self->{transaction_depth} > 1) {
1257     $self->{transaction_depth}--;
1258     $self->svp_release
1259       if $self->auto_savepoint;
1260   }
1261 }
1262
1263 sub _dbh_commit {
1264   my $self = shift;
1265   my $dbh  = $self->_dbh
1266     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1267   $dbh->commit;
1268 }
1269
1270 sub txn_rollback {
1271   my $self = shift;
1272   my $dbh = $self->_dbh;
1273   eval {
1274     if ($self->{transaction_depth} == 1) {
1275       $self->debugobj->txn_rollback()
1276         if ($self->debug);
1277       $self->{transaction_depth} = 0
1278         if $self->_dbh_autocommit;
1279       $self->_dbh_rollback;
1280     }
1281     elsif($self->{transaction_depth} > 1) {
1282       $self->{transaction_depth}--;
1283       if ($self->auto_savepoint) {
1284         $self->svp_rollback;
1285         $self->svp_release;
1286       }
1287     }
1288     else {
1289       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1290     }
1291   };
1292   if ($@) {
1293     my $error = $@;
1294     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1295     $error =~ /$exception_class/ and $self->throw_exception($error);
1296     # ensure that a failed rollback resets the transaction depth
1297     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1298     $self->throw_exception($error);
1299   }
1300 }
1301
1302 sub _dbh_rollback {
1303   my $self = shift;
1304   my $dbh  = $self->_dbh
1305     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1306   $dbh->rollback;
1307 }
1308
1309 # This used to be the top-half of _execute.  It was split out to make it
1310 #  easier to override in NoBindVars without duping the rest.  It takes up
1311 #  all of _execute's args, and emits $sql, @bind.
1312 sub _prep_for_execute {
1313   my ($self, $op, $extra_bind, $ident, $args) = @_;
1314
1315   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1316     $ident = $ident->from();
1317   }
1318
1319   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1320
1321   unshift(@bind,
1322     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1323       if $extra_bind;
1324   return ($sql, \@bind);
1325 }
1326
1327
1328 sub _fix_bind_params {
1329     my ($self, @bind) = @_;
1330
1331     ### Turn @bind from something like this:
1332     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1333     ### to this:
1334     ###   ( "'1'", "'1'", "'3'" )
1335     return
1336         map {
1337             if ( defined( $_ && $_->[1] ) ) {
1338                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1339             }
1340             else { q{'NULL'}; }
1341         } @bind;
1342 }
1343
1344 sub _query_start {
1345     my ( $self, $sql, @bind ) = @_;
1346
1347     if ( $self->debug ) {
1348         @bind = $self->_fix_bind_params(@bind);
1349
1350         $self->debugobj->query_start( $sql, @bind );
1351     }
1352 }
1353
1354 sub _query_end {
1355     my ( $self, $sql, @bind ) = @_;
1356
1357     if ( $self->debug ) {
1358         @bind = $self->_fix_bind_params(@bind);
1359         $self->debugobj->query_end( $sql, @bind );
1360     }
1361 }
1362
1363 sub _dbh_execute {
1364   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1365
1366   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1367
1368   $self->_query_start( $sql, @$bind );
1369
1370   my $sth = $self->sth($sql,$op);
1371
1372   my $placeholder_index = 1;
1373
1374   foreach my $bound (@$bind) {
1375     my $attributes = {};
1376     my($column_name, @data) = @$bound;
1377
1378     if ($bind_attributes) {
1379       $attributes = $bind_attributes->{$column_name}
1380       if defined $bind_attributes->{$column_name};
1381     }
1382
1383     foreach my $data (@data) {
1384       my $ref = ref $data;
1385       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1386
1387       $sth->bind_param($placeholder_index, $data, $attributes);
1388       $placeholder_index++;
1389     }
1390   }
1391
1392   # Can this fail without throwing an exception anyways???
1393   my $rv = $sth->execute();
1394   $self->throw_exception($sth->errstr) if !$rv;
1395
1396   $self->_query_end( $sql, @$bind );
1397
1398   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1399 }
1400
1401 sub _execute {
1402     my $self = shift;
1403     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1404 }
1405
1406 sub _prefetch_insert_auto_nextvals {
1407   my ($self, $source, $to_insert) = @_;
1408
1409   my $upd = {};
1410
1411   foreach my $col ( $source->columns ) {
1412     if ( !defined $to_insert->{$col} ) {
1413       my $col_info = $source->column_info($col);
1414
1415       if ( $col_info->{auto_nextval} ) {
1416         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1417           'nextval',
1418           $col_info->{sequence} ||=
1419             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1420         );
1421       }
1422     }
1423   }
1424
1425   return $upd;
1426 }
1427
1428 sub insert {
1429   my $self = shift;
1430   my ($source, $to_insert, $opts) = @_;
1431
1432   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1433
1434   my $bind_attributes = $self->source_bind_attributes($source);
1435
1436   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1437
1438   if ($opts->{returning}) {
1439     my @ret_cols = @{$opts->{returning}};
1440
1441     my @ret_vals = eval {
1442       local $SIG{__WARN__} = sub {};
1443       my @r = $sth->fetchrow_array;
1444       $sth->finish;
1445       @r;
1446     };
1447
1448     my %ret;
1449     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1450
1451     $updated_cols = {
1452       %$updated_cols,
1453       %ret,
1454     };
1455   }
1456
1457   return $updated_cols;
1458 }
1459
1460 ## Currently it is assumed that all values passed will be "normal", i.e. not
1461 ## scalar refs, or at least, all the same type as the first set, the statement is
1462 ## only prepped once.
1463 sub insert_bulk {
1464   my ($self, $source, $cols, $data) = @_;
1465
1466   my %colvalues;
1467   @colvalues{@$cols} = (0..$#$cols);
1468
1469   for my $i (0..$#$cols) {
1470     my $first_val = $data->[0][$i];
1471     next unless ref $first_val eq 'SCALAR';
1472
1473     $colvalues{ $cols->[$i] } = $first_val;
1474   }
1475
1476   # check for bad data and stringify stringifiable objects
1477   my $bad_slice = sub {
1478     my ($msg, $col_idx, $slice_idx) = @_;
1479     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1480       $msg,
1481       $cols->[$col_idx],
1482       do {
1483         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1484         Data::Dumper::Concise::Dumper({
1485           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1486         }),
1487       }
1488     );
1489   };
1490
1491   for my $datum_idx (0..$#$data) {
1492     my $datum = $data->[$datum_idx];
1493
1494     for my $col_idx (0..$#$cols) {
1495       my $val            = $datum->[$col_idx];
1496       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1497       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1498
1499       if ($is_literal_sql) {
1500         if (not ref $val) {
1501           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1502         }
1503         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1504           $bad_slice->("$reftype reference found where literal SQL expected",
1505             $col_idx, $datum_idx);
1506         }
1507         elsif ($$val ne $$sqla_bind){
1508           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1509             $col_idx, $datum_idx);
1510         }
1511       }
1512       elsif (my $reftype = ref $val) {
1513         require overload;
1514         if (overload::Method($val, '""')) {
1515           $datum->[$col_idx] = "".$val;
1516         }
1517         else {
1518           $bad_slice->("$reftype reference found where bind expected",
1519             $col_idx, $datum_idx);
1520         }
1521       }
1522     }
1523   }
1524
1525   my ($sql, $bind) = $self->_prep_for_execute (
1526     'insert', undef, $source, [\%colvalues]
1527   );
1528   my @bind = @$bind;
1529
1530   my $empty_bind = 1 if (not @bind) &&
1531     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1532
1533   if ((not @bind) && (not $empty_bind)) {
1534     $self->throw_exception(
1535       'Cannot insert_bulk without support for placeholders'
1536     );
1537   }
1538
1539   # neither _execute_array, nor _execute_inserts_with_no_binds are
1540   # atomic (even if _execute _array is a single call). Thus a safety
1541   # scope guard
1542   my $guard = $self->txn_scope_guard;
1543
1544   $self->_query_start( $sql, ['__BULK__'] );
1545   my $sth = $self->sth($sql);
1546   my $rv = do {
1547     if ($empty_bind) {
1548       # bind_param_array doesn't work if there are no binds
1549       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1550     }
1551     else {
1552 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1553       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1554     }
1555   };
1556
1557   $self->_query_end( $sql, ['__BULK__'] );
1558
1559   $guard->commit;
1560
1561   return (wantarray ? ($rv, $sth, @bind) : $rv);
1562 }
1563
1564 sub _execute_array {
1565   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1566
1567   ## This must be an arrayref, else nothing works!
1568   my $tuple_status = [];
1569
1570   ## Get the bind_attributes, if any exist
1571   my $bind_attributes = $self->source_bind_attributes($source);
1572
1573   ## Bind the values and execute
1574   my $placeholder_index = 1;
1575
1576   foreach my $bound (@$bind) {
1577
1578     my $attributes = {};
1579     my ($column_name, $data_index) = @$bound;
1580
1581     if( $bind_attributes ) {
1582       $attributes = $bind_attributes->{$column_name}
1583       if defined $bind_attributes->{$column_name};
1584     }
1585
1586     my @data = map { $_->[$data_index] } @$data;
1587
1588     $sth->bind_param_array(
1589       $placeholder_index,
1590       [@data],
1591       (%$attributes ?  $attributes : ()),
1592     );
1593     $placeholder_index++;
1594   }
1595
1596   my $rv = eval {
1597     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1598   };
1599   my $err = $@ || $sth->errstr;
1600
1601 # Statement must finish even if there was an exception.
1602   eval { $sth->finish };
1603   $err = $@ unless $err;
1604
1605   if ($err) {
1606     my $i = 0;
1607     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1608
1609     $self->throw_exception("Unexpected populate error: $err")
1610       if ($i > $#$tuple_status);
1611
1612     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1613       ($tuple_status->[$i][1] || $err),
1614       Data::Dumper::Concise::Dumper({
1615         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1616       }),
1617     );
1618   }
1619   return $rv;
1620 }
1621
1622 sub _dbh_execute_array {
1623     my ($self, $sth, $tuple_status, @extra) = @_;
1624
1625     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1626 }
1627
1628 sub _dbh_execute_inserts_with_no_binds {
1629   my ($self, $sth, $count) = @_;
1630
1631   eval {
1632     my $dbh = $self->_get_dbh;
1633     local $dbh->{RaiseError} = 1;
1634     local $dbh->{PrintError} = 0;
1635
1636     $sth->execute foreach 1..$count;
1637   };
1638   my $exception = $@;
1639
1640 # Make sure statement is finished even if there was an exception.
1641   eval { $sth->finish };
1642   $exception = $@ unless $exception;
1643
1644   $self->throw_exception($exception) if $exception;
1645
1646   return $count;
1647 }
1648
1649 sub update {
1650   my ($self, $source, @args) = @_;
1651
1652   my $bind_attrs = $self->source_bind_attributes($source);
1653
1654   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1655 }
1656
1657
1658 sub delete {
1659   my ($self, $source, @args) = @_;
1660
1661   my $bind_attrs = $self->source_bind_attributes($source);
1662
1663   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1664 }
1665
1666 # We were sent here because the $rs contains a complex search
1667 # which will require a subquery to select the correct rows
1668 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1669 #
1670 # Generating a single PK column subquery is trivial and supported
1671 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1672 # Look at _multipk_update_delete()
1673 sub _subq_update_delete {
1674   my $self = shift;
1675   my ($rs, $op, $values) = @_;
1676
1677   my $rsrc = $rs->result_source;
1678
1679   # quick check if we got a sane rs on our hands
1680   my @pcols = $rsrc->_pri_cols;
1681
1682   my $sel = $rs->_resolved_attrs->{select};
1683   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1684
1685   if (
1686       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1687         ne
1688       join ("\x00", sort @$sel )
1689   ) {
1690     $self->throw_exception (
1691       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1692     );
1693   }
1694
1695   if (@pcols == 1) {
1696     return $self->$op (
1697       $rsrc,
1698       $op eq 'update' ? $values : (),
1699       { $pcols[0] => { -in => $rs->as_query } },
1700     );
1701   }
1702
1703   else {
1704     return $self->_multipk_update_delete (@_);
1705   }
1706 }
1707
1708 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1709 # resultset update/delete involving subqueries. So by default resort
1710 # to simple (and inefficient) delete_all style per-row opearations,
1711 # while allowing specific storages to override this with a faster
1712 # implementation.
1713 #
1714 sub _multipk_update_delete {
1715   return shift->_per_row_update_delete (@_);
1716 }
1717
1718 # This is the default loop used to delete/update rows for multi PK
1719 # resultsets, and used by mysql exclusively (because it can't do anything
1720 # else).
1721 #
1722 # We do not use $row->$op style queries, because resultset update/delete
1723 # is not expected to cascade (this is what delete_all/update_all is for).
1724 #
1725 # There should be no race conditions as the entire operation is rolled
1726 # in a transaction.
1727 #
1728 sub _per_row_update_delete {
1729   my $self = shift;
1730   my ($rs, $op, $values) = @_;
1731
1732   my $rsrc = $rs->result_source;
1733   my @pcols = $rsrc->_pri_cols;
1734
1735   my $guard = $self->txn_scope_guard;
1736
1737   # emulate the return value of $sth->execute for non-selects
1738   my $row_cnt = '0E0';
1739
1740   my $subrs_cur = $rs->cursor;
1741   my @all_pk = $subrs_cur->all;
1742   for my $pks ( @all_pk) {
1743
1744     my $cond;
1745     for my $i (0.. $#pcols) {
1746       $cond->{$pcols[$i]} = $pks->[$i];
1747     }
1748
1749     $self->$op (
1750       $rsrc,
1751       $op eq 'update' ? $values : (),
1752       $cond,
1753     );
1754
1755     $row_cnt++;
1756   }
1757
1758   $guard->commit;
1759
1760   return $row_cnt;
1761 }
1762
1763 sub _select {
1764   my $self = shift;
1765
1766   # localization is neccessary as
1767   # 1) there is no infrastructure to pass this around before SQLA2
1768   # 2) _select_args sets it and _prep_for_execute consumes it
1769   my $sql_maker = $self->sql_maker;
1770   local $sql_maker->{_dbic_rs_attrs};
1771
1772   return $self->_execute($self->_select_args(@_));
1773 }
1774
1775 sub _select_args_to_query {
1776   my $self = shift;
1777
1778   # localization is neccessary as
1779   # 1) there is no infrastructure to pass this around before SQLA2
1780   # 2) _select_args sets it and _prep_for_execute consumes it
1781   my $sql_maker = $self->sql_maker;
1782   local $sql_maker->{_dbic_rs_attrs};
1783
1784   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1785   #  = $self->_select_args($ident, $select, $cond, $attrs);
1786   my ($op, $bind, $ident, $bind_attrs, @args) =
1787     $self->_select_args(@_);
1788
1789   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1790   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1791   $prepared_bind ||= [];
1792
1793   return wantarray
1794     ? ($sql, $prepared_bind, $bind_attrs)
1795     : \[ "($sql)", @$prepared_bind ]
1796   ;
1797 }
1798
1799 sub _select_args {
1800   my ($self, $ident, $select, $where, $attrs) = @_;
1801
1802   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1803
1804   my $sql_maker = $self->sql_maker;
1805   $sql_maker->{_dbic_rs_attrs} = {
1806     %$attrs,
1807     select => $select,
1808     from => $ident,
1809     where => $where,
1810     $rs_alias && $alias2source->{$rs_alias}
1811       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1812       : ()
1813     ,
1814   };
1815
1816   # calculate bind_attrs before possible $ident mangling
1817   my $bind_attrs = {};
1818   for my $alias (keys %$alias2source) {
1819     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1820     for my $col (keys %$bindtypes) {
1821
1822       my $fqcn = join ('.', $alias, $col);
1823       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1824
1825       # Unqialified column names are nice, but at the same time can be
1826       # rather ambiguous. What we do here is basically go along with
1827       # the loop, adding an unqualified column slot to $bind_attrs,
1828       # alongside the fully qualified name. As soon as we encounter
1829       # another column by that name (which would imply another table)
1830       # we unset the unqualified slot and never add any info to it
1831       # to avoid erroneous type binding. If this happens the users
1832       # only choice will be to fully qualify his column name
1833
1834       if (exists $bind_attrs->{$col}) {
1835         $bind_attrs->{$col} = {};
1836       }
1837       else {
1838         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1839       }
1840     }
1841   }
1842
1843   # adjust limits
1844   if (
1845     $attrs->{software_limit}
1846       ||
1847     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1848   ) {
1849     $attrs->{software_limit} = 1;
1850   }
1851   else {
1852     $self->throw_exception("rows attribute must be positive if present")
1853       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1854
1855     # MySQL actually recommends this approach.  I cringe.
1856     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1857   }
1858
1859   my @limit;
1860
1861   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1862   # storage, unless software limit was requested
1863   if (
1864     #limited has_many
1865     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1866        ||
1867     # limited prefetch with RNO subqueries
1868     (
1869       $attrs->{rows}
1870         &&
1871       $sql_maker->limit_dialect eq 'RowNumberOver'
1872         &&
1873       $attrs->{_prefetch_select}
1874         &&
1875       @{$attrs->{_prefetch_select}}
1876     )
1877       ||
1878     # grouped prefetch
1879     ( $attrs->{group_by}
1880         &&
1881       @{$attrs->{group_by}}
1882         &&
1883       $attrs->{_prefetch_select}
1884         &&
1885       @{$attrs->{_prefetch_select}}
1886     )
1887   ) {
1888     ($ident, $select, $where, $attrs)
1889       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1890   }
1891
1892   elsif (
1893     ($attrs->{rows} || $attrs->{offset})
1894       &&
1895     $sql_maker->limit_dialect eq 'RowNumberOver'
1896       &&
1897     (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
1898       &&
1899     scalar $self->_parse_order_by ($attrs->{order_by})
1900   ) {
1901     # the RNO limit dialect above mangles the SQL such that the join gets lost
1902     # wrap a subquery here
1903
1904     push @limit, delete @{$attrs}{qw/rows offset/};
1905
1906     my $subq = $self->_select_args_to_query (
1907       $ident,
1908       $select,
1909       $where,
1910       $attrs,
1911     );
1912
1913     $ident = {
1914       -alias => $attrs->{alias},
1915       -source_handle => $ident->[0]{-source_handle},
1916       $attrs->{alias} => $subq,
1917     };
1918
1919     # all part of the subquery now
1920     delete @{$attrs}{qw/order_by group_by having/};
1921     $where = undef;
1922   }
1923
1924   elsif (! $attrs->{software_limit} ) {
1925     push @limit, $attrs->{rows}, $attrs->{offset};
1926   }
1927
1928   # try to simplify the joinmap further (prune unreferenced type-single joins)
1929   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1930
1931 ###
1932   # This would be the point to deflate anything found in $where
1933   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1934   # expect a row object. And all we have is a resultsource (it is trivial
1935   # to extract deflator coderefs via $alias2source above).
1936   #
1937   # I don't see a way forward other than changing the way deflators are
1938   # invoked, and that's just bad...
1939 ###
1940
1941   my $order = { map
1942     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1943     (qw/order_by group_by having/ )
1944   };
1945
1946   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1947 }
1948
1949 # Returns a counting SELECT for a simple count
1950 # query. Abstracted so that a storage could override
1951 # this to { count => 'firstcol' } or whatever makes
1952 # sense as a performance optimization
1953 sub _count_select {
1954   #my ($self, $source, $rs_attrs) = @_;
1955   return { count => '*' };
1956 }
1957
1958 # Returns a SELECT which will end up in the subselect
1959 # There may or may not be a group_by, as the subquery
1960 # might have been called to accomodate a limit
1961 #
1962 # Most databases would be happy with whatever ends up
1963 # here, but some choke in various ways.
1964 #
1965 sub _subq_count_select {
1966   my ($self, $source, $rs_attrs) = @_;
1967
1968   if (my $groupby = $rs_attrs->{group_by}) {
1969
1970     my $avail_columns = $self->_resolve_column_info ($rs_attrs->{from});
1971
1972     my $sel_index;
1973     for my $sel (@{$rs_attrs->{select}}) {
1974       if (ref $sel eq 'HASH' and $sel->{-as}) {
1975         $sel_index->{$sel->{-as}} = $sel;
1976       }
1977     }
1978
1979     my @selection;
1980     for my $g_part (@$groupby) {
1981       if (ref $g_part or $avail_columns->{$g_part}) {
1982         push @selection, $g_part;
1983       }
1984       elsif ($sel_index->{$g_part}) {
1985         push @selection, $sel_index->{$g_part};
1986       }
1987       else {
1988         $self->throw_exception ("group_by criteria '$g_part' not contained within current resultset source(s)");
1989       }
1990     }
1991
1992     return \@selection;
1993   }
1994
1995   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1996   return @pcols ? \@pcols : [ 1 ];
1997 }
1998
1999 sub source_bind_attributes {
2000   my ($self, $source) = @_;
2001
2002   my $bind_attributes;
2003   foreach my $column ($source->columns) {
2004
2005     my $data_type = $source->column_info($column)->{data_type} || '';
2006     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2007      if $data_type;
2008   }
2009
2010   return $bind_attributes;
2011 }
2012
2013 =head2 select
2014
2015 =over 4
2016
2017 =item Arguments: $ident, $select, $condition, $attrs
2018
2019 =back
2020
2021 Handle a SQL select statement.
2022
2023 =cut
2024
2025 sub select {
2026   my $self = shift;
2027   my ($ident, $select, $condition, $attrs) = @_;
2028   return $self->cursor_class->new($self, \@_, $attrs);
2029 }
2030
2031 sub select_single {
2032   my $self = shift;
2033   my ($rv, $sth, @bind) = $self->_select(@_);
2034   my @row = $sth->fetchrow_array;
2035   my @nextrow = $sth->fetchrow_array if @row;
2036   if(@row && @nextrow) {
2037     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2038   }
2039   # Need to call finish() to work round broken DBDs
2040   $sth->finish();
2041   return @row;
2042 }
2043
2044 =head2 sth
2045
2046 =over 4
2047
2048 =item Arguments: $sql
2049
2050 =back
2051
2052 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2053
2054 =cut
2055
2056 sub _dbh_sth {
2057   my ($self, $dbh, $sql) = @_;
2058
2059   # 3 is the if_active parameter which avoids active sth re-use
2060   my $sth = $self->disable_sth_caching
2061     ? $dbh->prepare($sql)
2062     : $dbh->prepare_cached($sql, {}, 3);
2063
2064   # XXX You would think RaiseError would make this impossible,
2065   #  but apparently that's not true :(
2066   $self->throw_exception($dbh->errstr) if !$sth;
2067
2068   $sth;
2069 }
2070
2071 sub sth {
2072   my ($self, $sql) = @_;
2073   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2074 }
2075
2076 sub _dbh_columns_info_for {
2077   my ($self, $dbh, $table) = @_;
2078
2079   if ($dbh->can('column_info')) {
2080     my %result;
2081     eval {
2082       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2083       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2084       $sth->execute();
2085       while ( my $info = $sth->fetchrow_hashref() ){
2086         my %column_info;
2087         $column_info{data_type}   = $info->{TYPE_NAME};
2088         $column_info{size}      = $info->{COLUMN_SIZE};
2089         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2090         $column_info{default_value} = $info->{COLUMN_DEF};
2091         my $col_name = $info->{COLUMN_NAME};
2092         $col_name =~ s/^\"(.*)\"$/$1/;
2093
2094         $result{$col_name} = \%column_info;
2095       }
2096     };
2097     return \%result if !$@ && scalar keys %result;
2098   }
2099
2100   my %result;
2101   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2102   $sth->execute;
2103   my @columns = @{$sth->{NAME_lc}};
2104   for my $i ( 0 .. $#columns ){
2105     my %column_info;
2106     $column_info{data_type} = $sth->{TYPE}->[$i];
2107     $column_info{size} = $sth->{PRECISION}->[$i];
2108     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2109
2110     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2111       $column_info{data_type} = $1;
2112       $column_info{size}    = $2;
2113     }
2114
2115     $result{$columns[$i]} = \%column_info;
2116   }
2117   $sth->finish;
2118
2119   foreach my $col (keys %result) {
2120     my $colinfo = $result{$col};
2121     my $type_num = $colinfo->{data_type};
2122     my $type_name;
2123     if(defined $type_num && $dbh->can('type_info')) {
2124       my $type_info = $dbh->type_info($type_num);
2125       $type_name = $type_info->{TYPE_NAME} if $type_info;
2126       $colinfo->{data_type} = $type_name if $type_name;
2127     }
2128   }
2129
2130   return \%result;
2131 }
2132
2133 sub columns_info_for {
2134   my ($self, $table) = @_;
2135   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2136 }
2137
2138 =head2 last_insert_id
2139
2140 Return the row id of the last insert.
2141
2142 =cut
2143
2144 sub _dbh_last_insert_id {
2145     my ($self, $dbh, $source, $col) = @_;
2146
2147     my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2148
2149     return $id if defined $id;
2150
2151     my $class = ref $self;
2152     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2153 }
2154
2155 sub last_insert_id {
2156   my $self = shift;
2157   $self->_dbh_last_insert_id ($self->_dbh, @_);
2158 }
2159
2160 =head2 _native_data_type
2161
2162 =over 4
2163
2164 =item Arguments: $type_name
2165
2166 =back
2167
2168 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2169 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2170 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2171
2172 The default implementation returns C<undef>, implement in your Storage driver if
2173 you need this functionality.
2174
2175 Should map types from other databases to the native RDBMS type, for example
2176 C<VARCHAR2> to C<VARCHAR>.
2177
2178 Types with modifiers should map to the underlying data type. For example,
2179 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2180
2181 Composite types should map to the container type, for example
2182 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2183
2184 =cut
2185
2186 sub _native_data_type {
2187   #my ($self, $data_type) = @_;
2188   return undef
2189 }
2190
2191 # Check if placeholders are supported at all
2192 sub _placeholders_supported {
2193   my $self = shift;
2194   my $dbh  = $self->_get_dbh;
2195
2196   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2197   # but it is inaccurate more often than not
2198   eval {
2199     local $dbh->{PrintError} = 0;
2200     local $dbh->{RaiseError} = 1;
2201     $dbh->do('select ?', {}, 1);
2202   };
2203   return $@ ? 0 : 1;
2204 }
2205
2206 # Check if placeholders bound to non-string types throw exceptions
2207 #
2208 sub _typeless_placeholders_supported {
2209   my $self = shift;
2210   my $dbh  = $self->_get_dbh;
2211
2212   eval {
2213     local $dbh->{PrintError} = 0;
2214     local $dbh->{RaiseError} = 1;
2215     # this specifically tests a bind that is NOT a string
2216     $dbh->do('select 1 where 1 = ?', {}, 1);
2217   };
2218   return $@ ? 0 : 1;
2219 }
2220
2221 =head2 sqlt_type
2222
2223 Returns the database driver name.
2224
2225 =cut
2226
2227 sub sqlt_type {
2228   shift->_get_dbh->{Driver}->{Name};
2229 }
2230
2231 =head2 bind_attribute_by_data_type
2232
2233 Given a datatype from column info, returns a database specific bind
2234 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2235 let the database planner just handle it.
2236
2237 Generally only needed for special case column types, like bytea in postgres.
2238
2239 =cut
2240
2241 sub bind_attribute_by_data_type {
2242     return;
2243 }
2244
2245 =head2 is_datatype_numeric
2246
2247 Given a datatype from column_info, returns a boolean value indicating if
2248 the current RDBMS considers it a numeric value. This controls how
2249 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2250 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2251 be performed instead of the usual C<eq>.
2252
2253 =cut
2254
2255 sub is_datatype_numeric {
2256   my ($self, $dt) = @_;
2257
2258   return 0 unless $dt;
2259
2260   return $dt =~ /^ (?:
2261     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2262   ) $/ix;
2263 }
2264
2265
2266 =head2 create_ddl_dir
2267
2268 =over 4
2269
2270 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2271
2272 =back
2273
2274 Creates a SQL file based on the Schema, for each of the specified
2275 database engines in C<\@databases> in the given directory.
2276 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2277
2278 Given a previous version number, this will also create a file containing
2279 the ALTER TABLE statements to transform the previous schema into the
2280 current one. Note that these statements may contain C<DROP TABLE> or
2281 C<DROP COLUMN> statements that can potentially destroy data.
2282
2283 The file names are created using the C<ddl_filename> method below, please
2284 override this method in your schema if you would like a different file
2285 name format. For the ALTER file, the same format is used, replacing
2286 $version in the name with "$preversion-$version".
2287
2288 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2289 The most common value for this would be C<< { add_drop_table => 1 } >>
2290 to have the SQL produced include a C<DROP TABLE> statement for each table
2291 created. For quoting purposes supply C<quote_table_names> and
2292 C<quote_field_names>.
2293
2294 If no arguments are passed, then the following default values are assumed:
2295
2296 =over 4
2297
2298 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2299
2300 =item version    - $schema->schema_version
2301
2302 =item directory  - './'
2303
2304 =item preversion - <none>
2305
2306 =back
2307
2308 By default, C<\%sqlt_args> will have
2309
2310  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2311
2312 merged with the hash passed in. To disable any of those features, pass in a
2313 hashref like the following
2314
2315  { ignore_constraint_names => 0, # ... other options }
2316
2317
2318 WARNING: You are strongly advised to check all SQL files created, before applying
2319 them.
2320
2321 =cut
2322
2323 sub create_ddl_dir {
2324   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2325
2326   unless ($dir) {
2327     carp "No directory given, using ./\n";
2328     $dir = './';
2329   }
2330
2331   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2332
2333   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2334   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2335
2336   my $schema_version = $schema->schema_version || '1.x';
2337   $version ||= $schema_version;
2338
2339   $sqltargs = {
2340     add_drop_table => 1,
2341     ignore_constraint_names => 1,
2342     ignore_index_names => 1,
2343     %{$sqltargs || {}}
2344   };
2345
2346   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2347     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2348   }
2349
2350   my $sqlt = SQL::Translator->new( $sqltargs );
2351
2352   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2353   my $sqlt_schema = $sqlt->translate({ data => $schema })
2354     or $self->throw_exception ($sqlt->error);
2355
2356   foreach my $db (@$databases) {
2357     $sqlt->reset();
2358     $sqlt->{schema} = $sqlt_schema;
2359     $sqlt->producer($db);
2360
2361     my $file;
2362     my $filename = $schema->ddl_filename($db, $version, $dir);
2363     if (-e $filename && ($version eq $schema_version )) {
2364       # if we are dumping the current version, overwrite the DDL
2365       carp "Overwriting existing DDL file - $filename";
2366       unlink($filename);
2367     }
2368
2369     my $output = $sqlt->translate;
2370     if(!$output) {
2371       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2372       next;
2373     }
2374     if(!open($file, ">$filename")) {
2375       $self->throw_exception("Can't open $filename for writing ($!)");
2376       next;
2377     }
2378     print $file $output;
2379     close($file);
2380
2381     next unless ($preversion);
2382
2383     require SQL::Translator::Diff;
2384
2385     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2386     if(!-e $prefilename) {
2387       carp("No previous schema file found ($prefilename)");
2388       next;
2389     }
2390
2391     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2392     if(-e $difffile) {
2393       carp("Overwriting existing diff file - $difffile");
2394       unlink($difffile);
2395     }
2396
2397     my $source_schema;
2398     {
2399       my $t = SQL::Translator->new($sqltargs);
2400       $t->debug( 0 );
2401       $t->trace( 0 );
2402
2403       $t->parser( $db )
2404         or $self->throw_exception ($t->error);
2405
2406       my $out = $t->translate( $prefilename )
2407         or $self->throw_exception ($t->error);
2408
2409       $source_schema = $t->schema;
2410
2411       $source_schema->name( $prefilename )
2412         unless ( $source_schema->name );
2413     }
2414
2415     # The "new" style of producers have sane normalization and can support
2416     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2417     # And we have to diff parsed SQL against parsed SQL.
2418     my $dest_schema = $sqlt_schema;
2419
2420     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2421       my $t = SQL::Translator->new($sqltargs);
2422       $t->debug( 0 );
2423       $t->trace( 0 );
2424
2425       $t->parser( $db )
2426         or $self->throw_exception ($t->error);
2427
2428       my $out = $t->translate( $filename )
2429         or $self->throw_exception ($t->error);
2430
2431       $dest_schema = $t->schema;
2432
2433       $dest_schema->name( $filename )
2434         unless $dest_schema->name;
2435     }
2436
2437     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2438                                                   $dest_schema,   $db,
2439                                                   $sqltargs
2440                                                  );
2441     if(!open $file, ">$difffile") {
2442       $self->throw_exception("Can't write to $difffile ($!)");
2443       next;
2444     }
2445     print $file $diff;
2446     close($file);
2447   }
2448 }
2449
2450 =head2 deployment_statements
2451
2452 =over 4
2453
2454 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2455
2456 =back
2457
2458 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2459
2460 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2461 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2462
2463 C<$directory> is used to return statements from files in a previously created
2464 L</create_ddl_dir> directory and is optional. The filenames are constructed
2465 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2466
2467 If no C<$directory> is specified then the statements are constructed on the
2468 fly using L<SQL::Translator> and C<$version> is ignored.
2469
2470 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2471
2472 =cut
2473
2474 sub deployment_statements {
2475   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2476   $type ||= $self->sqlt_type;
2477   $version ||= $schema->schema_version || '1.x';
2478   $dir ||= './';
2479   my $filename = $schema->ddl_filename($type, $version, $dir);
2480   if(-f $filename)
2481   {
2482       my $file;
2483       open($file, "<$filename")
2484         or $self->throw_exception("Can't open $filename ($!)");
2485       my @rows = <$file>;
2486       close($file);
2487       return join('', @rows);
2488   }
2489
2490   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2491     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2492   }
2493
2494   # sources needs to be a parser arg, but for simplicty allow at top level
2495   # coming in
2496   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2497       if exists $sqltargs->{sources};
2498
2499   my $tr = SQL::Translator->new(
2500     producer => "SQL::Translator::Producer::${type}",
2501     %$sqltargs,
2502     parser => 'SQL::Translator::Parser::DBIx::Class',
2503     data => $schema,
2504   );
2505
2506   my @ret;
2507   my $wa = wantarray;
2508   if ($wa) {
2509     @ret = $tr->translate;
2510   }
2511   else {
2512     $ret[0] = $tr->translate;
2513   }
2514
2515   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2516     unless (@ret && defined $ret[0]);
2517
2518   return $wa ? @ret : $ret[0];
2519 }
2520
2521 sub deploy {
2522   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2523   my $deploy = sub {
2524     my $line = shift;
2525     return if($line =~ /^--/);
2526     return if(!$line);
2527     # next if($line =~ /^DROP/m);
2528     return if($line =~ /^BEGIN TRANSACTION/m);
2529     return if($line =~ /^COMMIT/m);
2530     return if $line =~ /^\s+$/; # skip whitespace only
2531     $self->_query_start($line);
2532     eval {
2533       # do a dbh_do cycle here, as we need some error checking in
2534       # place (even though we will ignore errors)
2535       $self->dbh_do (sub { $_[1]->do($line) });
2536     };
2537     if ($@) {
2538       carp qq{$@ (running "${line}")};
2539     }
2540     $self->_query_end($line);
2541   };
2542   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2543   if (@statements > 1) {
2544     foreach my $statement (@statements) {
2545       $deploy->( $statement );
2546     }
2547   }
2548   elsif (@statements == 1) {
2549     foreach my $line ( split(";\n", $statements[0])) {
2550       $deploy->( $line );
2551     }
2552   }
2553 }
2554
2555 =head2 datetime_parser
2556
2557 Returns the datetime parser class
2558
2559 =cut
2560
2561 sub datetime_parser {
2562   my $self = shift;
2563   return $self->{datetime_parser} ||= do {
2564     $self->build_datetime_parser(@_);
2565   };
2566 }
2567
2568 =head2 datetime_parser_type
2569
2570 Defines (returns) the datetime parser class - currently hardwired to
2571 L<DateTime::Format::MySQL>
2572
2573 =cut
2574
2575 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2576
2577 =head2 build_datetime_parser
2578
2579 See L</datetime_parser>
2580
2581 =cut
2582
2583 sub build_datetime_parser {
2584   my $self = shift;
2585   my $type = $self->datetime_parser_type(@_);
2586   $self->ensure_class_loaded ($type);
2587   return $type;
2588 }
2589
2590
2591 =head2 is_replicating
2592
2593 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2594 replicate from a master database.  Default is undef, which is the result
2595 returned by databases that don't support replication.
2596
2597 =cut
2598
2599 sub is_replicating {
2600     return;
2601
2602 }
2603
2604 =head2 lag_behind_master
2605
2606 Returns a number that represents a certain amount of lag behind a master db
2607 when a given storage is replicating.  The number is database dependent, but
2608 starts at zero and increases with the amount of lag. Default in undef
2609
2610 =cut
2611
2612 sub lag_behind_master {
2613     return;
2614 }
2615
2616 =head2 relname_to_table_alias
2617
2618 =over 4
2619
2620 =item Arguments: $relname, $join_count
2621
2622 =back
2623
2624 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2625 queries.
2626
2627 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2628 way these aliases are named.
2629
2630 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2631 otherwise C<"$relname">.
2632
2633 =cut
2634
2635 sub relname_to_table_alias {
2636   my ($self, $relname, $join_count) = @_;
2637
2638   my $alias = ($join_count && $join_count > 1 ?
2639     join('_', $relname, $join_count) : $relname);
2640
2641   return $alias;
2642 }
2643
2644 sub DESTROY {
2645   my $self = shift;
2646
2647   $self->_verify_pid if $self->_dbh;
2648
2649   # some databases need this to stop spewing warnings
2650   if (my $dbh = $self->_dbh) {
2651     local $@;
2652     eval {
2653       %{ $dbh->{CachedKids} } = ();
2654       $dbh->disconnect;
2655     };
2656   }
2657
2658   $self->_dbh(undef);
2659 }
2660
2661 1;
2662
2663 =head1 USAGE NOTES
2664
2665 =head2 DBIx::Class and AutoCommit
2666
2667 DBIx::Class can do some wonderful magic with handling exceptions,
2668 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2669 (the default) combined with C<txn_do> for transaction support.
2670
2671 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2672 in an assumed transaction between commits, and you're telling us you'd
2673 like to manage that manually.  A lot of the magic protections offered by
2674 this module will go away.  We can't protect you from exceptions due to database
2675 disconnects because we don't know anything about how to restart your
2676 transactions.  You're on your own for handling all sorts of exceptional
2677 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2678 be with raw DBI.
2679
2680
2681 =head1 AUTHORS
2682
2683 Matt S. Trout <mst@shadowcatsystems.co.uk>
2684
2685 Andy Grundman <andy@hybridized.org>
2686
2687 =head1 LICENSE
2688
2689 You may distribute this code under the same terms as Perl itself.
2690
2691 =cut