68593da26ed9609ee9148fadc686d85fd39def71
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18
19 use File::Path ();
20
21 __PACKAGE__->mk_group_accessors('simple' =>
22   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
23      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints
24      _server_info_hash/
25 );
26
27 # the values for these accessors are picked out (and deleted) from
28 # the attribute hashref passed to connect_info
29 my @storage_options = qw/
30   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
31   disable_sth_caching unsafe auto_savepoint
32 /;
33 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
34
35
36 # default cursor class, overridable in connect_info attributes
37 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
38
39 __PACKAGE__->mk_group_accessors('inherited' => qw/
40   sql_maker_class
41   _supports_insert_returning
42 /);
43 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
44
45 # Each of these methods need _determine_driver called before itself
46 # in order to function reliably. This is a purely DRY optimization
47 my @rdbms_specific_methods = qw/
48   deployment_statements
49   sqlt_type
50   build_datetime_parser
51   datetime_parser_type
52
53   insert
54   insert_bulk
55   update
56   delete
57   select
58   select_single
59 /;
60
61 for my $meth (@rdbms_specific_methods) {
62
63   my $orig = __PACKAGE__->can ($meth)
64     or next;
65
66   no strict qw/refs/;
67   no warnings qw/redefine/;
68   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
69     if (not $_[0]->_driver_determined) {
70       $_[0]->_determine_driver;
71       goto $_[0]->can($meth);
72     }
73     $orig->(@_);
74   };
75 }
76
77
78 =head1 NAME
79
80 DBIx::Class::Storage::DBI - DBI storage handler
81
82 =head1 SYNOPSIS
83
84   my $schema = MySchema->connect('dbi:SQLite:my.db');
85
86   $schema->storage->debug(1);
87
88   my @stuff = $schema->storage->dbh_do(
89     sub {
90       my ($storage, $dbh, @args) = @_;
91       $dbh->do("DROP TABLE authors");
92     },
93     @column_list
94   );
95
96   $schema->resultset('Book')->search({
97      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
98   });
99
100 =head1 DESCRIPTION
101
102 This class represents the connection to an RDBMS via L<DBI>.  See
103 L<DBIx::Class::Storage> for general information.  This pod only
104 documents DBI-specific methods and behaviors.
105
106 =head1 METHODS
107
108 =cut
109
110 sub new {
111   my $new = shift->next::method(@_);
112
113   $new->transaction_depth(0);
114   $new->_sql_maker_opts({});
115   $new->{savepoints} = [];
116   $new->{_in_dbh_do} = 0;
117   $new->{_dbh_gen} = 0;
118
119   $new;
120 }
121
122 =head2 connect_info
123
124 This method is normally called by L<DBIx::Class::Schema/connection>, which
125 encapsulates its argument list in an arrayref before passing them here.
126
127 The argument list may contain:
128
129 =over
130
131 =item *
132
133 The same 4-element argument set one would normally pass to
134 L<DBI/connect>, optionally followed by
135 L<extra attributes|/DBIx::Class specific connection attributes>
136 recognized by DBIx::Class:
137
138   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
139
140 =item *
141
142 A single code reference which returns a connected
143 L<DBI database handle|DBI/connect> optionally followed by
144 L<extra attributes|/DBIx::Class specific connection attributes> recognized
145 by DBIx::Class:
146
147   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
148
149 =item *
150
151 A single hashref with all the attributes and the dsn/user/password
152 mixed together:
153
154   $connect_info_args = [{
155     dsn => $dsn,
156     user => $user,
157     password => $pass,
158     %dbi_attributes,
159     %extra_attributes,
160   }];
161
162   $connect_info_args = [{
163     dbh_maker => sub { DBI->connect (...) },
164     %dbi_attributes,
165     %extra_attributes,
166   }];
167
168 This is particularly useful for L<Catalyst> based applications, allowing the
169 following config (L<Config::General> style):
170
171   <Model::DB>
172     schema_class   App::DB
173     <connect_info>
174       dsn          dbi:mysql:database=test
175       user         testuser
176       password     TestPass
177       AutoCommit   1
178     </connect_info>
179   </Model::DB>
180
181 The C<dsn>/C<user>/C<password> combination can be substituted by the
182 C<dbh_maker> key whose value is a coderef that returns a connected
183 L<DBI database handle|DBI/connect>
184
185 =back
186
187 Please note that the L<DBI> docs recommend that you always explicitly
188 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
189 recommends that it be set to I<1>, and that you perform transactions
190 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
191 to I<1> if you do not do explicitly set it to zero.  This is the default
192 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
193
194 =head3 DBIx::Class specific connection attributes
195
196 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
197 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
198 the following connection options. These options can be mixed in with your other
199 L<DBI> connection attributes, or placed in a separate hashref
200 (C<\%extra_attributes>) as shown above.
201
202 Every time C<connect_info> is invoked, any previous settings for
203 these options will be cleared before setting the new ones, regardless of
204 whether any options are specified in the new C<connect_info>.
205
206
207 =over
208
209 =item on_connect_do
210
211 Specifies things to do immediately after connecting or re-connecting to
212 the database.  Its value may contain:
213
214 =over
215
216 =item a scalar
217
218 This contains one SQL statement to execute.
219
220 =item an array reference
221
222 This contains SQL statements to execute in order.  Each element contains
223 a string or a code reference that returns a string.
224
225 =item a code reference
226
227 This contains some code to execute.  Unlike code references within an
228 array reference, its return value is ignored.
229
230 =back
231
232 =item on_disconnect_do
233
234 Takes arguments in the same form as L</on_connect_do> and executes them
235 immediately before disconnecting from the database.
236
237 Note, this only runs if you explicitly call L</disconnect> on the
238 storage object.
239
240 =item on_connect_call
241
242 A more generalized form of L</on_connect_do> that calls the specified
243 C<connect_call_METHOD> methods in your storage driver.
244
245   on_connect_do => 'select 1'
246
247 is equivalent to:
248
249   on_connect_call => [ [ do_sql => 'select 1' ] ]
250
251 Its values may contain:
252
253 =over
254
255 =item a scalar
256
257 Will call the C<connect_call_METHOD> method.
258
259 =item a code reference
260
261 Will execute C<< $code->($storage) >>
262
263 =item an array reference
264
265 Each value can be a method name or code reference.
266
267 =item an array of arrays
268
269 For each array, the first item is taken to be the C<connect_call_> method name
270 or code reference, and the rest are parameters to it.
271
272 =back
273
274 Some predefined storage methods you may use:
275
276 =over
277
278 =item do_sql
279
280 Executes a SQL string or a code reference that returns a SQL string. This is
281 what L</on_connect_do> and L</on_disconnect_do> use.
282
283 It can take:
284
285 =over
286
287 =item a scalar
288
289 Will execute the scalar as SQL.
290
291 =item an arrayref
292
293 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
294 attributes hashref and bind values.
295
296 =item a code reference
297
298 Will execute C<< $code->($storage) >> and execute the return array refs as
299 above.
300
301 =back
302
303 =item datetime_setup
304
305 Execute any statements necessary to initialize the database session to return
306 and accept datetime/timestamp values used with
307 L<DBIx::Class::InflateColumn::DateTime>.
308
309 Only necessary for some databases, see your specific storage driver for
310 implementation details.
311
312 =back
313
314 =item on_disconnect_call
315
316 Takes arguments in the same form as L</on_connect_call> and executes them
317 immediately before disconnecting from the database.
318
319 Calls the C<disconnect_call_METHOD> methods as opposed to the
320 C<connect_call_METHOD> methods called by L</on_connect_call>.
321
322 Note, this only runs if you explicitly call L</disconnect> on the
323 storage object.
324
325 =item disable_sth_caching
326
327 If set to a true value, this option will disable the caching of
328 statement handles via L<DBI/prepare_cached>.
329
330 =item limit_dialect
331
332 Sets the limit dialect. This is useful for JDBC-bridge among others
333 where the remote SQL-dialect cannot be determined by the name of the
334 driver alone. See also L<SQL::Abstract::Limit>.
335
336 =item quote_char
337
338 Specifies what characters to use to quote table and column names. If
339 you use this you will want to specify L</name_sep> as well.
340
341 C<quote_char> expects either a single character, in which case is it
342 is placed on either side of the table/column name, or an arrayref of length
343 2 in which case the table/column name is placed between the elements.
344
345 For example under MySQL you should use C<< quote_char => '`' >>, and for
346 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
347
348 =item name_sep
349
350 This only needs to be used in conjunction with C<quote_char>, and is used to
351 specify the character that separates elements (schemas, tables, columns) from
352 each other. In most cases this is simply a C<.>.
353
354 The consequences of not supplying this value is that L<SQL::Abstract>
355 will assume DBIx::Class' uses of aliases to be complete column
356 names. The output will look like I<"me.name"> when it should actually
357 be I<"me"."name">.
358
359 =item unsafe
360
361 This Storage driver normally installs its own C<HandleError>, sets
362 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
363 all database handles, including those supplied by a coderef.  It does this
364 so that it can have consistent and useful error behavior.
365
366 If you set this option to a true value, Storage will not do its usual
367 modifications to the database handle's attributes, and instead relies on
368 the settings in your connect_info DBI options (or the values you set in
369 your connection coderef, in the case that you are connecting via coderef).
370
371 Note that your custom settings can cause Storage to malfunction,
372 especially if you set a C<HandleError> handler that suppresses exceptions
373 and/or disable C<RaiseError>.
374
375 =item auto_savepoint
376
377 If this option is true, L<DBIx::Class> will use savepoints when nesting
378 transactions, making it possible to recover from failure in the inner
379 transaction without having to abort all outer transactions.
380
381 =item cursor_class
382
383 Use this argument to supply a cursor class other than the default
384 L<DBIx::Class::Storage::DBI::Cursor>.
385
386 =back
387
388 Some real-life examples of arguments to L</connect_info> and
389 L<DBIx::Class::Schema/connect>
390
391   # Simple SQLite connection
392   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
393
394   # Connect via subref
395   ->connect_info([ sub { DBI->connect(...) } ]);
396
397   # Connect via subref in hashref
398   ->connect_info([{
399     dbh_maker => sub { DBI->connect(...) },
400     on_connect_do => 'alter session ...',
401   }]);
402
403   # A bit more complicated
404   ->connect_info(
405     [
406       'dbi:Pg:dbname=foo',
407       'postgres',
408       'my_pg_password',
409       { AutoCommit => 1 },
410       { quote_char => q{"}, name_sep => q{.} },
411     ]
412   );
413
414   # Equivalent to the previous example
415   ->connect_info(
416     [
417       'dbi:Pg:dbname=foo',
418       'postgres',
419       'my_pg_password',
420       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
421     ]
422   );
423
424   # Same, but with hashref as argument
425   # See parse_connect_info for explanation
426   ->connect_info(
427     [{
428       dsn         => 'dbi:Pg:dbname=foo',
429       user        => 'postgres',
430       password    => 'my_pg_password',
431       AutoCommit  => 1,
432       quote_char  => q{"},
433       name_sep    => q{.},
434     }]
435   );
436
437   # Subref + DBIx::Class-specific connection options
438   ->connect_info(
439     [
440       sub { DBI->connect(...) },
441       {
442           quote_char => q{`},
443           name_sep => q{@},
444           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
445           disable_sth_caching => 1,
446       },
447     ]
448   );
449
450
451
452 =cut
453
454 sub connect_info {
455   my ($self, $info) = @_;
456
457   return $self->_connect_info if !$info;
458
459   $self->_connect_info($info); # copy for _connect_info
460
461   $info = $self->_normalize_connect_info($info)
462     if ref $info eq 'ARRAY';
463
464   for my $storage_opt (keys %{ $info->{storage_options} }) {
465     my $value = $info->{storage_options}{$storage_opt};
466
467     $self->$storage_opt($value);
468   }
469
470   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
471   #  the new set of options
472   $self->_sql_maker(undef);
473   $self->_sql_maker_opts({});
474
475   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
476     my $value = $info->{sql_maker_options}{$sql_maker_opt};
477
478     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
479   }
480
481   my %attrs = (
482     %{ $self->_default_dbi_connect_attributes || {} },
483     %{ $info->{attributes} || {} },
484   );
485
486   my @args = @{ $info->{arguments} };
487
488   $self->_dbi_connect_info([@args,
489     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
490
491   return $self->_connect_info;
492 }
493
494 sub _normalize_connect_info {
495   my ($self, $info_arg) = @_;
496   my %info;
497
498   my @args = @$info_arg;  # take a shallow copy for further mutilation
499
500   # combine/pre-parse arguments depending on invocation style
501
502   my %attrs;
503   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
504     %attrs = %{ $args[1] || {} };
505     @args = $args[0];
506   }
507   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
508     %attrs = %{$args[0]};
509     @args = ();
510     if (my $code = delete $attrs{dbh_maker}) {
511       @args = $code;
512
513       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
514       if (@ignored) {
515         carp sprintf (
516             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
517           . "to the result of 'dbh_maker'",
518
519           join (', ', map { "'$_'" } (@ignored) ),
520         );
521       }
522     }
523     else {
524       @args = delete @attrs{qw/dsn user password/};
525     }
526   }
527   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
528     %attrs = (
529       % { $args[3] || {} },
530       % { $args[4] || {} },
531     );
532     @args = @args[0,1,2];
533   }
534
535   $info{arguments} = \@args;
536
537   my @storage_opts = grep exists $attrs{$_},
538     @storage_options, 'cursor_class';
539
540   @{ $info{storage_options} }{@storage_opts} =
541     delete @attrs{@storage_opts} if @storage_opts;
542
543   my @sql_maker_opts = grep exists $attrs{$_},
544     qw/limit_dialect quote_char name_sep/;
545
546   @{ $info{sql_maker_options} }{@sql_maker_opts} =
547     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
548
549   $info{attributes} = \%attrs if %attrs;
550
551   return \%info;
552 }
553
554 sub _default_dbi_connect_attributes {
555   return {
556     AutoCommit => 1,
557     RaiseError => 1,
558     PrintError => 0,
559   };
560 }
561
562 =head2 on_connect_do
563
564 This method is deprecated in favour of setting via L</connect_info>.
565
566 =cut
567
568 =head2 on_disconnect_do
569
570 This method is deprecated in favour of setting via L</connect_info>.
571
572 =cut
573
574 sub _parse_connect_do {
575   my ($self, $type) = @_;
576
577   my $val = $self->$type;
578   return () if not defined $val;
579
580   my @res;
581
582   if (not ref($val)) {
583     push @res, [ 'do_sql', $val ];
584   } elsif (ref($val) eq 'CODE') {
585     push @res, $val;
586   } elsif (ref($val) eq 'ARRAY') {
587     push @res, map { [ 'do_sql', $_ ] } @$val;
588   } else {
589     $self->throw_exception("Invalid type for $type: ".ref($val));
590   }
591
592   return \@res;
593 }
594
595 =head2 dbh_do
596
597 Arguments: ($subref | $method_name), @extra_coderef_args?
598
599 Execute the given $subref or $method_name using the new exception-based
600 connection management.
601
602 The first two arguments will be the storage object that C<dbh_do> was called
603 on and a database handle to use.  Any additional arguments will be passed
604 verbatim to the called subref as arguments 2 and onwards.
605
606 Using this (instead of $self->_dbh or $self->dbh) ensures correct
607 exception handling and reconnection (or failover in future subclasses).
608
609 Your subref should have no side-effects outside of the database, as
610 there is the potential for your subref to be partially double-executed
611 if the database connection was stale/dysfunctional.
612
613 Example:
614
615   my @stuff = $schema->storage->dbh_do(
616     sub {
617       my ($storage, $dbh, @cols) = @_;
618       my $cols = join(q{, }, @cols);
619       $dbh->selectrow_array("SELECT $cols FROM foo");
620     },
621     @column_list
622   );
623
624 =cut
625
626 sub dbh_do {
627   my $self = shift;
628   my $code = shift;
629
630   my $dbh = $self->_get_dbh;
631
632   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
633       || $self->{transaction_depth};
634
635   local $self->{_in_dbh_do} = 1;
636
637   my @result;
638   my $want_array = wantarray;
639
640   eval {
641
642     if($want_array) {
643         @result = $self->$code($dbh, @_);
644     }
645     elsif(defined $want_array) {
646         $result[0] = $self->$code($dbh, @_);
647     }
648     else {
649         $self->$code($dbh, @_);
650     }
651   };
652
653   # ->connected might unset $@ - copy
654   my $exception = $@;
655   if(!$exception) { return $want_array ? @result : $result[0] }
656
657   $self->throw_exception($exception) if $self->connected;
658
659   # We were not connected - reconnect and retry, but let any
660   #  exception fall right through this time
661   carp "Retrying $code after catching disconnected exception: $exception"
662     if $ENV{DBIC_DBIRETRY_DEBUG};
663   $self->_populate_dbh;
664   $self->$code($self->_dbh, @_);
665 }
666
667 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
668 # It also informs dbh_do to bypass itself while under the direction of txn_do,
669 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
670 sub txn_do {
671   my $self = shift;
672   my $coderef = shift;
673
674   ref $coderef eq 'CODE' or $self->throw_exception
675     ('$coderef must be a CODE reference');
676
677   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
678
679   local $self->{_in_dbh_do} = 1;
680
681   my @result;
682   my $want_array = wantarray;
683
684   my $tried = 0;
685   while(1) {
686     eval {
687       $self->_get_dbh;
688
689       $self->txn_begin;
690       if($want_array) {
691           @result = $coderef->(@_);
692       }
693       elsif(defined $want_array) {
694           $result[0] = $coderef->(@_);
695       }
696       else {
697           $coderef->(@_);
698       }
699       $self->txn_commit;
700     };
701
702     # ->connected might unset $@ - copy
703     my $exception = $@;
704     if(!$exception) { return $want_array ? @result : $result[0] }
705
706     if($tried++ || $self->connected) {
707       eval { $self->txn_rollback };
708       my $rollback_exception = $@;
709       if($rollback_exception) {
710         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
711         $self->throw_exception($exception)  # propagate nested rollback
712           if $rollback_exception =~ /$exception_class/;
713
714         $self->throw_exception(
715           "Transaction aborted: ${exception}. "
716           . "Rollback failed: ${rollback_exception}"
717         );
718       }
719       $self->throw_exception($exception)
720     }
721
722     # We were not connected, and was first try - reconnect and retry
723     # via the while loop
724     carp "Retrying $coderef after catching disconnected exception: $exception"
725       if $ENV{DBIC_DBIRETRY_DEBUG};
726     $self->_populate_dbh;
727   }
728 }
729
730 =head2 disconnect
731
732 Our C<disconnect> method also performs a rollback first if the
733 database is not in C<AutoCommit> mode.
734
735 =cut
736
737 sub disconnect {
738   my ($self) = @_;
739
740   if( $self->_dbh ) {
741     my @actions;
742
743     push @actions, ( $self->on_disconnect_call || () );
744     push @actions, $self->_parse_connect_do ('on_disconnect_do');
745
746     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
747
748     $self->_dbh_rollback unless $self->_dbh_autocommit;
749
750     %{ $self->_dbh->{CachedKids} } = ();
751     $self->_dbh->disconnect;
752     $self->_dbh(undef);
753     $self->{_dbh_gen}++;
754   }
755 }
756
757 =head2 with_deferred_fk_checks
758
759 =over 4
760
761 =item Arguments: C<$coderef>
762
763 =item Return Value: The return value of $coderef
764
765 =back
766
767 Storage specific method to run the code ref with FK checks deferred or
768 in MySQL's case disabled entirely.
769
770 =cut
771
772 # Storage subclasses should override this
773 sub with_deferred_fk_checks {
774   my ($self, $sub) = @_;
775   $sub->();
776 }
777
778 =head2 connected
779
780 =over
781
782 =item Arguments: none
783
784 =item Return Value: 1|0
785
786 =back
787
788 Verifies that the current database handle is active and ready to execute
789 an SQL statement (e.g. the connection did not get stale, server is still
790 answering, etc.) This method is used internally by L</dbh>.
791
792 =cut
793
794 sub connected {
795   my $self = shift;
796   return 0 unless $self->_seems_connected;
797
798   #be on the safe side
799   local $self->_dbh->{RaiseError} = 1;
800
801   return $self->_ping;
802 }
803
804 sub _seems_connected {
805   my $self = shift;
806
807   my $dbh = $self->_dbh
808     or return 0;
809
810   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
811     $self->_dbh(undef);
812     $self->{_dbh_gen}++;
813     return 0;
814   }
815   else {
816     $self->_verify_pid;
817     return 0 if !$self->_dbh;
818   }
819
820   return $dbh->FETCH('Active');
821 }
822
823 sub _ping {
824   my $self = shift;
825
826   my $dbh = $self->_dbh or return 0;
827
828   return $dbh->ping;
829 }
830
831 # handle pid changes correctly
832 #  NOTE: assumes $self->_dbh is a valid $dbh
833 sub _verify_pid {
834   my ($self) = @_;
835
836   return if defined $self->_conn_pid && $self->_conn_pid == $$;
837
838   $self->_dbh->{InactiveDestroy} = 1;
839   $self->_dbh(undef);
840   $self->{_dbh_gen}++;
841
842   return;
843 }
844
845 sub ensure_connected {
846   my ($self) = @_;
847
848   unless ($self->connected) {
849     $self->_populate_dbh;
850   }
851 }
852
853 =head2 dbh
854
855 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
856 is guaranteed to be healthy by implicitly calling L</connected>, and if
857 necessary performing a reconnection before returning. Keep in mind that this
858 is very B<expensive> on some database engines. Consider using L</dbh_do>
859 instead.
860
861 =cut
862
863 sub dbh {
864   my ($self) = @_;
865
866   if (not $self->_dbh) {
867     $self->_populate_dbh;
868   } else {
869     $self->ensure_connected;
870   }
871   return $self->_dbh;
872 }
873
874 # this is the internal "get dbh or connect (don't check)" method
875 sub _get_dbh {
876   my $self = shift;
877   $self->_verify_pid if $self->_dbh;
878   $self->_populate_dbh unless $self->_dbh;
879   return $self->_dbh;
880 }
881
882 sub _sql_maker_args {
883     my ($self) = @_;
884
885     return (
886       bindtype=>'columns',
887       array_datatypes => 1,
888       limit_dialect => $self->_get_dbh,
889       %{$self->_sql_maker_opts}
890     );
891 }
892
893 sub sql_maker {
894   my ($self) = @_;
895   unless ($self->_sql_maker) {
896     my $sql_maker_class = $self->sql_maker_class;
897     $self->ensure_class_loaded ($sql_maker_class);
898     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
899   }
900   return $self->_sql_maker;
901 }
902
903 # nothing to do by default
904 sub _rebless {}
905 sub _init {}
906
907 sub _populate_dbh {
908   my ($self) = @_;
909
910   my @info = @{$self->_dbi_connect_info || []};
911   $self->_dbh(undef); # in case ->connected failed we might get sent here
912   $self->_server_info_hash (undef);
913   $self->_dbh($self->_connect(@info));
914
915   $self->_conn_pid($$);
916   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
917
918   $self->_determine_driver;
919
920   # Always set the transaction depth on connect, since
921   #  there is no transaction in progress by definition
922   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
923
924   $self->_run_connection_actions unless $self->{_in_determine_driver};
925 }
926
927 sub _run_connection_actions {
928   my $self = shift;
929   my @actions;
930
931   push @actions, ( $self->on_connect_call || () );
932   push @actions, $self->_parse_connect_do ('on_connect_do');
933
934   $self->_do_connection_actions(connect_call_ => $_) for @actions;
935 }
936
937 sub _server_info {
938   my $self = shift;
939
940   unless ($self->_server_info_hash) {
941
942     my %info;
943
944     my $server_version = $self->_get_server_version;
945
946     if (defined $server_version) {
947       $info{dbms_version} = $server_version;
948
949       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
950       my @verparts = split (/\./, $numeric_version);
951       if (
952         @verparts
953           &&
954         $verparts[0] <= 999
955       ) {
956         # consider only up to 3 version parts, iff not more than 3 digits
957         my @use_parts;
958         while (@verparts && @use_parts < 3) {
959           my $p = shift @verparts;
960           last if $p > 999;
961           push @use_parts, $p;
962         }
963         push @use_parts, 0 while @use_parts < 3;
964
965         $info{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
966       }
967     }
968
969     $self->_server_info_hash(\%info);
970   }
971
972   return $self->_server_info_hash
973 }
974
975 sub _get_server_version {
976   eval { shift->_get_dbh->get_info(18) };
977 }
978
979 sub _determine_driver {
980   my ($self) = @_;
981
982   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
983     my $started_connected = 0;
984     local $self->{_in_determine_driver} = 1;
985
986     if (ref($self) eq __PACKAGE__) {
987       my $driver;
988       if ($self->_dbh) { # we are connected
989         $driver = $self->_dbh->{Driver}{Name};
990         $started_connected = 1;
991       } else {
992         # if connect_info is a CODEREF, we have no choice but to connect
993         if (ref $self->_dbi_connect_info->[0] &&
994             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
995           $self->_populate_dbh;
996           $driver = $self->_dbh->{Driver}{Name};
997         }
998         else {
999           # try to use dsn to not require being connected, the driver may still
1000           # force a connection in _rebless to determine version
1001           # (dsn may not be supplied at all if all we do is make a mock-schema)
1002           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1003           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1004           $driver ||= $ENV{DBI_DRIVER};
1005         }
1006       }
1007
1008       if ($driver) {
1009         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1010         if ($self->load_optional_class($storage_class)) {
1011           mro::set_mro($storage_class, 'c3');
1012           bless $self, $storage_class;
1013           $self->_rebless();
1014         }
1015       }
1016     }
1017
1018     $self->_driver_determined(1);
1019
1020     $self->_init; # run driver-specific initializations
1021
1022     $self->_run_connection_actions
1023         if !$started_connected && defined $self->_dbh;
1024   }
1025 }
1026
1027 sub _do_connection_actions {
1028   my $self          = shift;
1029   my $method_prefix = shift;
1030   my $call          = shift;
1031
1032   if (not ref($call)) {
1033     my $method = $method_prefix . $call;
1034     $self->$method(@_);
1035   } elsif (ref($call) eq 'CODE') {
1036     $self->$call(@_);
1037   } elsif (ref($call) eq 'ARRAY') {
1038     if (ref($call->[0]) ne 'ARRAY') {
1039       $self->_do_connection_actions($method_prefix, $_) for @$call;
1040     } else {
1041       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1042     }
1043   } else {
1044     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1045   }
1046
1047   return $self;
1048 }
1049
1050 sub connect_call_do_sql {
1051   my $self = shift;
1052   $self->_do_query(@_);
1053 }
1054
1055 sub disconnect_call_do_sql {
1056   my $self = shift;
1057   $self->_do_query(@_);
1058 }
1059
1060 # override in db-specific backend when necessary
1061 sub connect_call_datetime_setup { 1 }
1062
1063 sub _do_query {
1064   my ($self, $action) = @_;
1065
1066   if (ref $action eq 'CODE') {
1067     $action = $action->($self);
1068     $self->_do_query($_) foreach @$action;
1069   }
1070   else {
1071     # Most debuggers expect ($sql, @bind), so we need to exclude
1072     # the attribute hash which is the second argument to $dbh->do
1073     # furthermore the bind values are usually to be presented
1074     # as named arrayref pairs, so wrap those here too
1075     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1076     my $sql = shift @do_args;
1077     my $attrs = shift @do_args;
1078     my @bind = map { [ undef, $_ ] } @do_args;
1079
1080     $self->_query_start($sql, @bind);
1081     $self->_get_dbh->do($sql, $attrs, @do_args);
1082     $self->_query_end($sql, @bind);
1083   }
1084
1085   return $self;
1086 }
1087
1088 sub _connect {
1089   my ($self, @info) = @_;
1090
1091   $self->throw_exception("You failed to provide any connection info")
1092     if !@info;
1093
1094   my ($old_connect_via, $dbh);
1095
1096   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1097     $old_connect_via = $DBI::connect_via;
1098     $DBI::connect_via = 'connect';
1099   }
1100
1101   eval {
1102     if(ref $info[0] eq 'CODE') {
1103        $dbh = $info[0]->();
1104     }
1105     else {
1106        $dbh = DBI->connect(@info);
1107     }
1108
1109     if($dbh && !$self->unsafe) {
1110       my $weak_self = $self;
1111       Scalar::Util::weaken($weak_self);
1112       $dbh->{HandleError} = sub {
1113           if ($weak_self) {
1114             $weak_self->throw_exception("DBI Exception: $_[0]");
1115           }
1116           else {
1117             # the handler may be invoked by something totally out of
1118             # the scope of DBIC
1119             croak ("DBI Exception: $_[0]");
1120           }
1121       };
1122       $dbh->{ShowErrorStatement} = 1;
1123       $dbh->{RaiseError} = 1;
1124       $dbh->{PrintError} = 0;
1125     }
1126   };
1127
1128   $DBI::connect_via = $old_connect_via if $old_connect_via;
1129
1130   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1131     if !$dbh || $@;
1132
1133   $self->_dbh_autocommit($dbh->{AutoCommit});
1134
1135   $dbh;
1136 }
1137
1138 sub svp_begin {
1139   my ($self, $name) = @_;
1140
1141   $name = $self->_svp_generate_name
1142     unless defined $name;
1143
1144   $self->throw_exception ("You can't use savepoints outside a transaction")
1145     if $self->{transaction_depth} == 0;
1146
1147   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1148     unless $self->can('_svp_begin');
1149
1150   push @{ $self->{savepoints} }, $name;
1151
1152   $self->debugobj->svp_begin($name) if $self->debug;
1153
1154   return $self->_svp_begin($name);
1155 }
1156
1157 sub svp_release {
1158   my ($self, $name) = @_;
1159
1160   $self->throw_exception ("You can't use savepoints outside a transaction")
1161     if $self->{transaction_depth} == 0;
1162
1163   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1164     unless $self->can('_svp_release');
1165
1166   if (defined $name) {
1167     $self->throw_exception ("Savepoint '$name' does not exist")
1168       unless grep { $_ eq $name } @{ $self->{savepoints} };
1169
1170     # Dig through the stack until we find the one we are releasing.  This keeps
1171     # the stack up to date.
1172     my $svp;
1173
1174     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1175   } else {
1176     $name = pop @{ $self->{savepoints} };
1177   }
1178
1179   $self->debugobj->svp_release($name) if $self->debug;
1180
1181   return $self->_svp_release($name);
1182 }
1183
1184 sub svp_rollback {
1185   my ($self, $name) = @_;
1186
1187   $self->throw_exception ("You can't use savepoints outside a transaction")
1188     if $self->{transaction_depth} == 0;
1189
1190   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1191     unless $self->can('_svp_rollback');
1192
1193   if (defined $name) {
1194       # If they passed us a name, verify that it exists in the stack
1195       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1196           $self->throw_exception("Savepoint '$name' does not exist!");
1197       }
1198
1199       # Dig through the stack until we find the one we are releasing.  This keeps
1200       # the stack up to date.
1201       while(my $s = pop(@{ $self->{savepoints} })) {
1202           last if($s eq $name);
1203       }
1204       # Add the savepoint back to the stack, as a rollback doesn't remove the
1205       # named savepoint, only everything after it.
1206       push(@{ $self->{savepoints} }, $name);
1207   } else {
1208       # We'll assume they want to rollback to the last savepoint
1209       $name = $self->{savepoints}->[-1];
1210   }
1211
1212   $self->debugobj->svp_rollback($name) if $self->debug;
1213
1214   return $self->_svp_rollback($name);
1215 }
1216
1217 sub _svp_generate_name {
1218     my ($self) = @_;
1219
1220     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1221 }
1222
1223 sub txn_begin {
1224   my $self = shift;
1225
1226   # this means we have not yet connected and do not know the AC status
1227   # (e.g. coderef $dbh)
1228   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1229
1230   if($self->{transaction_depth} == 0) {
1231     $self->debugobj->txn_begin()
1232       if $self->debug;
1233     $self->_dbh_begin_work;
1234   }
1235   elsif ($self->auto_savepoint) {
1236     $self->svp_begin;
1237   }
1238   $self->{transaction_depth}++;
1239 }
1240
1241 sub _dbh_begin_work {
1242   my $self = shift;
1243
1244   # if the user is utilizing txn_do - good for him, otherwise we need to
1245   # ensure that the $dbh is healthy on BEGIN.
1246   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1247   # will be replaced by a failure of begin_work itself (which will be
1248   # then retried on reconnect)
1249   if ($self->{_in_dbh_do}) {
1250     $self->_dbh->begin_work;
1251   } else {
1252     $self->dbh_do(sub { $_[1]->begin_work });
1253   }
1254 }
1255
1256 sub txn_commit {
1257   my $self = shift;
1258   if ($self->{transaction_depth} == 1) {
1259     $self->debugobj->txn_commit()
1260       if ($self->debug);
1261     $self->_dbh_commit;
1262     $self->{transaction_depth} = 0
1263       if $self->_dbh_autocommit;
1264   }
1265   elsif($self->{transaction_depth} > 1) {
1266     $self->{transaction_depth}--;
1267     $self->svp_release
1268       if $self->auto_savepoint;
1269   }
1270 }
1271
1272 sub _dbh_commit {
1273   my $self = shift;
1274   my $dbh  = $self->_dbh
1275     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1276   $dbh->commit;
1277 }
1278
1279 sub txn_rollback {
1280   my $self = shift;
1281   my $dbh = $self->_dbh;
1282   eval {
1283     if ($self->{transaction_depth} == 1) {
1284       $self->debugobj->txn_rollback()
1285         if ($self->debug);
1286       $self->{transaction_depth} = 0
1287         if $self->_dbh_autocommit;
1288       $self->_dbh_rollback;
1289     }
1290     elsif($self->{transaction_depth} > 1) {
1291       $self->{transaction_depth}--;
1292       if ($self->auto_savepoint) {
1293         $self->svp_rollback;
1294         $self->svp_release;
1295       }
1296     }
1297     else {
1298       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1299     }
1300   };
1301   if ($@) {
1302     my $error = $@;
1303     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1304     $error =~ /$exception_class/ and $self->throw_exception($error);
1305     # ensure that a failed rollback resets the transaction depth
1306     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1307     $self->throw_exception($error);
1308   }
1309 }
1310
1311 sub _dbh_rollback {
1312   my $self = shift;
1313   my $dbh  = $self->_dbh
1314     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1315   $dbh->rollback;
1316 }
1317
1318 # This used to be the top-half of _execute.  It was split out to make it
1319 #  easier to override in NoBindVars without duping the rest.  It takes up
1320 #  all of _execute's args, and emits $sql, @bind.
1321 sub _prep_for_execute {
1322   my ($self, $op, $extra_bind, $ident, $args) = @_;
1323
1324   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1325     $ident = $ident->from();
1326   }
1327
1328   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1329
1330   unshift(@bind,
1331     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1332       if $extra_bind;
1333   return ($sql, \@bind);
1334 }
1335
1336
1337 sub _fix_bind_params {
1338     my ($self, @bind) = @_;
1339
1340     ### Turn @bind from something like this:
1341     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1342     ### to this:
1343     ###   ( "'1'", "'1'", "'3'" )
1344     return
1345         map {
1346             if ( defined( $_ && $_->[1] ) ) {
1347                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1348             }
1349             else { q{'NULL'}; }
1350         } @bind;
1351 }
1352
1353 sub _query_start {
1354     my ( $self, $sql, @bind ) = @_;
1355
1356     if ( $self->debug ) {
1357         @bind = $self->_fix_bind_params(@bind);
1358
1359         $self->debugobj->query_start( $sql, @bind );
1360     }
1361 }
1362
1363 sub _query_end {
1364     my ( $self, $sql, @bind ) = @_;
1365
1366     if ( $self->debug ) {
1367         @bind = $self->_fix_bind_params(@bind);
1368         $self->debugobj->query_end( $sql, @bind );
1369     }
1370 }
1371
1372 sub _dbh_execute {
1373   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1374
1375   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1376
1377   $self->_query_start( $sql, @$bind );
1378
1379   my $sth = $self->sth($sql,$op);
1380
1381   my $placeholder_index = 1;
1382
1383   foreach my $bound (@$bind) {
1384     my $attributes = {};
1385     my($column_name, @data) = @$bound;
1386
1387     if ($bind_attributes) {
1388       $attributes = $bind_attributes->{$column_name}
1389       if defined $bind_attributes->{$column_name};
1390     }
1391
1392     foreach my $data (@data) {
1393       my $ref = ref $data;
1394       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1395
1396       $sth->bind_param($placeholder_index, $data, $attributes);
1397       $placeholder_index++;
1398     }
1399   }
1400
1401   # Can this fail without throwing an exception anyways???
1402   my $rv = $sth->execute();
1403   $self->throw_exception($sth->errstr) if !$rv;
1404
1405   $self->_query_end( $sql, @$bind );
1406
1407   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1408 }
1409
1410 sub _execute {
1411     my $self = shift;
1412     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1413 }
1414
1415 sub _prefetch_insert_auto_nextvals {
1416   my ($self, $source, $to_insert) = @_;
1417
1418   my $upd = {};
1419
1420   foreach my $col ( $source->columns ) {
1421     if ( !defined $to_insert->{$col} ) {
1422       my $col_info = $source->column_info($col);
1423
1424       if ( $col_info->{auto_nextval} ) {
1425         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1426           'nextval',
1427           $col_info->{sequence} ||=
1428             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1429         );
1430       }
1431     }
1432   }
1433
1434   return $upd;
1435 }
1436
1437 sub insert {
1438   my $self = shift;
1439   my ($source, $to_insert, $opts) = @_;
1440
1441   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1442
1443   my $bind_attributes = $self->source_bind_attributes($source);
1444
1445   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1446
1447   if ($opts->{returning}) {
1448     my @ret_cols = @{$opts->{returning}};
1449
1450     my @ret_vals = eval {
1451       local $SIG{__WARN__} = sub {};
1452       my @r = $sth->fetchrow_array;
1453       $sth->finish;
1454       @r;
1455     };
1456
1457     my %ret;
1458     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1459
1460     $updated_cols = {
1461       %$updated_cols,
1462       %ret,
1463     };
1464   }
1465
1466   return $updated_cols;
1467 }
1468
1469 ## Currently it is assumed that all values passed will be "normal", i.e. not
1470 ## scalar refs, or at least, all the same type as the first set, the statement is
1471 ## only prepped once.
1472 sub insert_bulk {
1473   my ($self, $source, $cols, $data) = @_;
1474
1475   my %colvalues;
1476   @colvalues{@$cols} = (0..$#$cols);
1477
1478   for my $i (0..$#$cols) {
1479     my $first_val = $data->[0][$i];
1480     next unless ref $first_val eq 'SCALAR';
1481
1482     $colvalues{ $cols->[$i] } = $first_val;
1483   }
1484
1485   # check for bad data and stringify stringifiable objects
1486   my $bad_slice = sub {
1487     my ($msg, $col_idx, $slice_idx) = @_;
1488     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1489       $msg,
1490       $cols->[$col_idx],
1491       do {
1492         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1493         Data::Dumper::Concise::Dumper({
1494           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1495         }),
1496       }
1497     );
1498   };
1499
1500   for my $datum_idx (0..$#$data) {
1501     my $datum = $data->[$datum_idx];
1502
1503     for my $col_idx (0..$#$cols) {
1504       my $val            = $datum->[$col_idx];
1505       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1506       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1507
1508       if ($is_literal_sql) {
1509         if (not ref $val) {
1510           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1511         }
1512         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1513           $bad_slice->("$reftype reference found where literal SQL expected",
1514             $col_idx, $datum_idx);
1515         }
1516         elsif ($$val ne $$sqla_bind){
1517           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1518             $col_idx, $datum_idx);
1519         }
1520       }
1521       elsif (my $reftype = ref $val) {
1522         require overload;
1523         if (overload::Method($val, '""')) {
1524           $datum->[$col_idx] = "".$val;
1525         }
1526         else {
1527           $bad_slice->("$reftype reference found where bind expected",
1528             $col_idx, $datum_idx);
1529         }
1530       }
1531     }
1532   }
1533
1534   my ($sql, $bind) = $self->_prep_for_execute (
1535     'insert', undef, $source, [\%colvalues]
1536   );
1537   my @bind = @$bind;
1538
1539   my $empty_bind = 1 if (not @bind) &&
1540     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1541
1542   if ((not @bind) && (not $empty_bind)) {
1543     $self->throw_exception(
1544       'Cannot insert_bulk without support for placeholders'
1545     );
1546   }
1547
1548   # neither _execute_array, nor _execute_inserts_with_no_binds are
1549   # atomic (even if _execute _array is a single call). Thus a safety
1550   # scope guard
1551   my $guard = $self->txn_scope_guard;
1552
1553   $self->_query_start( $sql, ['__BULK__'] );
1554   my $sth = $self->sth($sql);
1555   my $rv = do {
1556     if ($empty_bind) {
1557       # bind_param_array doesn't work if there are no binds
1558       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1559     }
1560     else {
1561 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1562       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1563     }
1564   };
1565
1566   $self->_query_end( $sql, ['__BULK__'] );
1567
1568   $guard->commit;
1569
1570   return (wantarray ? ($rv, $sth, @bind) : $rv);
1571 }
1572
1573 sub _execute_array {
1574   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1575
1576   ## This must be an arrayref, else nothing works!
1577   my $tuple_status = [];
1578
1579   ## Get the bind_attributes, if any exist
1580   my $bind_attributes = $self->source_bind_attributes($source);
1581
1582   ## Bind the values and execute
1583   my $placeholder_index = 1;
1584
1585   foreach my $bound (@$bind) {
1586
1587     my $attributes = {};
1588     my ($column_name, $data_index) = @$bound;
1589
1590     if( $bind_attributes ) {
1591       $attributes = $bind_attributes->{$column_name}
1592       if defined $bind_attributes->{$column_name};
1593     }
1594
1595     my @data = map { $_->[$data_index] } @$data;
1596
1597     $sth->bind_param_array(
1598       $placeholder_index,
1599       [@data],
1600       (%$attributes ?  $attributes : ()),
1601     );
1602     $placeholder_index++;
1603   }
1604
1605   my $rv = eval {
1606     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1607   };
1608   my $err = $@ || $sth->errstr;
1609
1610 # Statement must finish even if there was an exception.
1611   eval { $sth->finish };
1612   $err = $@ unless $err;
1613
1614   if ($err) {
1615     my $i = 0;
1616     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1617
1618     $self->throw_exception("Unexpected populate error: $err")
1619       if ($i > $#$tuple_status);
1620
1621     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1622       ($tuple_status->[$i][1] || $err),
1623       Data::Dumper::Concise::Dumper({
1624         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1625       }),
1626     );
1627   }
1628   return $rv;
1629 }
1630
1631 sub _dbh_execute_array {
1632     my ($self, $sth, $tuple_status, @extra) = @_;
1633
1634     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1635 }
1636
1637 sub _dbh_execute_inserts_with_no_binds {
1638   my ($self, $sth, $count) = @_;
1639
1640   eval {
1641     my $dbh = $self->_get_dbh;
1642     local $dbh->{RaiseError} = 1;
1643     local $dbh->{PrintError} = 0;
1644
1645     $sth->execute foreach 1..$count;
1646   };
1647   my $exception = $@;
1648
1649 # Make sure statement is finished even if there was an exception.
1650   eval { $sth->finish };
1651   $exception = $@ unless $exception;
1652
1653   $self->throw_exception($exception) if $exception;
1654
1655   return $count;
1656 }
1657
1658 sub update {
1659   my ($self, $source, @args) = @_;
1660
1661   my $bind_attrs = $self->source_bind_attributes($source);
1662
1663   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1664 }
1665
1666
1667 sub delete {
1668   my ($self, $source, @args) = @_;
1669
1670   my $bind_attrs = $self->source_bind_attributes($source);
1671
1672   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1673 }
1674
1675 # We were sent here because the $rs contains a complex search
1676 # which will require a subquery to select the correct rows
1677 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1678 #
1679 # Generating a single PK column subquery is trivial and supported
1680 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1681 # Look at _multipk_update_delete()
1682 sub _subq_update_delete {
1683   my $self = shift;
1684   my ($rs, $op, $values) = @_;
1685
1686   my $rsrc = $rs->result_source;
1687
1688   # quick check if we got a sane rs on our hands
1689   my @pcols = $rsrc->_pri_cols;
1690
1691   my $sel = $rs->_resolved_attrs->{select};
1692   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1693
1694   if (
1695       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1696         ne
1697       join ("\x00", sort @$sel )
1698   ) {
1699     $self->throw_exception (
1700       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1701     );
1702   }
1703
1704   if (@pcols == 1) {
1705     return $self->$op (
1706       $rsrc,
1707       $op eq 'update' ? $values : (),
1708       { $pcols[0] => { -in => $rs->as_query } },
1709     );
1710   }
1711
1712   else {
1713     return $self->_multipk_update_delete (@_);
1714   }
1715 }
1716
1717 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1718 # resultset update/delete involving subqueries. So by default resort
1719 # to simple (and inefficient) delete_all style per-row opearations,
1720 # while allowing specific storages to override this with a faster
1721 # implementation.
1722 #
1723 sub _multipk_update_delete {
1724   return shift->_per_row_update_delete (@_);
1725 }
1726
1727 # This is the default loop used to delete/update rows for multi PK
1728 # resultsets, and used by mysql exclusively (because it can't do anything
1729 # else).
1730 #
1731 # We do not use $row->$op style queries, because resultset update/delete
1732 # is not expected to cascade (this is what delete_all/update_all is for).
1733 #
1734 # There should be no race conditions as the entire operation is rolled
1735 # in a transaction.
1736 #
1737 sub _per_row_update_delete {
1738   my $self = shift;
1739   my ($rs, $op, $values) = @_;
1740
1741   my $rsrc = $rs->result_source;
1742   my @pcols = $rsrc->_pri_cols;
1743
1744   my $guard = $self->txn_scope_guard;
1745
1746   # emulate the return value of $sth->execute for non-selects
1747   my $row_cnt = '0E0';
1748
1749   my $subrs_cur = $rs->cursor;
1750   my @all_pk = $subrs_cur->all;
1751   for my $pks ( @all_pk) {
1752
1753     my $cond;
1754     for my $i (0.. $#pcols) {
1755       $cond->{$pcols[$i]} = $pks->[$i];
1756     }
1757
1758     $self->$op (
1759       $rsrc,
1760       $op eq 'update' ? $values : (),
1761       $cond,
1762     );
1763
1764     $row_cnt++;
1765   }
1766
1767   $guard->commit;
1768
1769   return $row_cnt;
1770 }
1771
1772 sub _select {
1773   my $self = shift;
1774
1775   # localization is neccessary as
1776   # 1) there is no infrastructure to pass this around before SQLA2
1777   # 2) _select_args sets it and _prep_for_execute consumes it
1778   my $sql_maker = $self->sql_maker;
1779   local $sql_maker->{_dbic_rs_attrs};
1780
1781   return $self->_execute($self->_select_args(@_));
1782 }
1783
1784 sub _select_args_to_query {
1785   my $self = shift;
1786
1787   # localization is neccessary as
1788   # 1) there is no infrastructure to pass this around before SQLA2
1789   # 2) _select_args sets it and _prep_for_execute consumes it
1790   my $sql_maker = $self->sql_maker;
1791   local $sql_maker->{_dbic_rs_attrs};
1792
1793   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1794   #  = $self->_select_args($ident, $select, $cond, $attrs);
1795   my ($op, $bind, $ident, $bind_attrs, @args) =
1796     $self->_select_args(@_);
1797
1798   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1799   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1800   $prepared_bind ||= [];
1801
1802   return wantarray
1803     ? ($sql, $prepared_bind, $bind_attrs)
1804     : \[ "($sql)", @$prepared_bind ]
1805   ;
1806 }
1807
1808 sub _select_args {
1809   my ($self, $ident, $select, $where, $attrs) = @_;
1810
1811   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1812
1813   my $sql_maker = $self->sql_maker;
1814   $sql_maker->{_dbic_rs_attrs} = {
1815     %$attrs,
1816     select => $select,
1817     from => $ident,
1818     where => $where,
1819     $rs_alias && $alias2source->{$rs_alias}
1820       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1821       : ()
1822     ,
1823   };
1824
1825   # calculate bind_attrs before possible $ident mangling
1826   my $bind_attrs = {};
1827   for my $alias (keys %$alias2source) {
1828     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1829     for my $col (keys %$bindtypes) {
1830
1831       my $fqcn = join ('.', $alias, $col);
1832       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1833
1834       # Unqialified column names are nice, but at the same time can be
1835       # rather ambiguous. What we do here is basically go along with
1836       # the loop, adding an unqualified column slot to $bind_attrs,
1837       # alongside the fully qualified name. As soon as we encounter
1838       # another column by that name (which would imply another table)
1839       # we unset the unqualified slot and never add any info to it
1840       # to avoid erroneous type binding. If this happens the users
1841       # only choice will be to fully qualify his column name
1842
1843       if (exists $bind_attrs->{$col}) {
1844         $bind_attrs->{$col} = {};
1845       }
1846       else {
1847         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1848       }
1849     }
1850   }
1851
1852   # adjust limits
1853   if (
1854     $attrs->{software_limit}
1855       ||
1856     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1857   ) {
1858     $attrs->{software_limit} = 1;
1859   }
1860   else {
1861     $self->throw_exception("rows attribute must be positive if present")
1862       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1863
1864     # MySQL actually recommends this approach.  I cringe.
1865     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1866   }
1867
1868   my @limit;
1869
1870   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1871   # storage, unless software limit was requested
1872   if (
1873     #limited has_many
1874     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1875        ||
1876     # limited prefetch with RNO subqueries (otherwise a risk of column name clashes)
1877     (
1878       $attrs->{rows}
1879         &&
1880       $sql_maker->limit_dialect eq 'RowNumberOver'
1881         &&
1882       $attrs->{_prefetch_select}
1883         &&
1884       @{$attrs->{_prefetch_select}}
1885     )
1886       ||
1887     # grouped prefetch (to satisfy group_by == select)
1888     ( $attrs->{group_by}
1889         &&
1890       @{$attrs->{group_by}}
1891         &&
1892       $attrs->{_prefetch_select}
1893         &&
1894       @{$attrs->{_prefetch_select}}
1895     )
1896   ) {
1897     ($ident, $select, $where, $attrs)
1898       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1899   }
1900
1901   elsif (
1902     # the RNO limit dialect mangles the SQL such that the join gets lost
1903     # wrap a subquery here
1904     ($attrs->{rows} || $attrs->{offset})
1905       &&
1906     $sql_maker->limit_dialect eq 'RowNumberOver'
1907       &&
1908     (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
1909       &&
1910     scalar $self->_parse_order_by ($attrs->{order_by})
1911   ) {
1912
1913     push @limit, delete @{$attrs}{qw/rows offset/};
1914
1915     my $subq = $self->_select_args_to_query (
1916       $ident,
1917       $select,
1918       $where,
1919       $attrs,
1920     );
1921
1922     $ident = {
1923       -alias => $attrs->{alias},
1924       -source_handle => $ident->[0]{-source_handle},
1925       $attrs->{alias} => $subq,
1926     };
1927
1928     # all part of the subquery now
1929     delete @{$attrs}{qw/order_by group_by having/};
1930     $where = undef;
1931   }
1932
1933   elsif (! $attrs->{software_limit} ) {
1934     push @limit, $attrs->{rows}, $attrs->{offset};
1935   }
1936
1937   # try to simplify the joinmap further (prune unreferenced type-single joins)
1938   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1939
1940 ###
1941   # This would be the point to deflate anything found in $where
1942   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1943   # expect a row object. And all we have is a resultsource (it is trivial
1944   # to extract deflator coderefs via $alias2source above).
1945   #
1946   # I don't see a way forward other than changing the way deflators are
1947   # invoked, and that's just bad...
1948 ###
1949
1950   my $order = { map
1951     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1952     (qw/order_by group_by having/ )
1953   };
1954
1955   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1956 }
1957
1958 # Returns a counting SELECT for a simple count
1959 # query. Abstracted so that a storage could override
1960 # this to { count => 'firstcol' } or whatever makes
1961 # sense as a performance optimization
1962 sub _count_select {
1963   #my ($self, $source, $rs_attrs) = @_;
1964   return { count => '*' };
1965 }
1966
1967 # Returns a SELECT which will end up in the subselect
1968 # There may or may not be a group_by, as the subquery
1969 # might have been called to accomodate a limit
1970 #
1971 # Most databases would be happy with whatever ends up
1972 # here, but some choke in various ways.
1973 #
1974 sub _subq_count_select {
1975   my ($self, $source, $rs_attrs) = @_;
1976
1977   if (my $groupby = $rs_attrs->{group_by}) {
1978
1979     my $avail_columns = $self->_resolve_column_info ($rs_attrs->{from});
1980
1981     my $sel_index;
1982     for my $sel (@{$rs_attrs->{select}}) {
1983       if (ref $sel eq 'HASH' and $sel->{-as}) {
1984         $sel_index->{$sel->{-as}} = $sel;
1985       }
1986     }
1987
1988     my @selection;
1989     for my $g_part (@$groupby) {
1990       if (ref $g_part or $avail_columns->{$g_part}) {
1991         push @selection, $g_part;
1992       }
1993       elsif ($sel_index->{$g_part}) {
1994         push @selection, $sel_index->{$g_part};
1995       }
1996       else {
1997         $self->throw_exception ("group_by criteria '$g_part' not contained within current resultset source(s)");
1998       }
1999     }
2000
2001     return \@selection;
2002   }
2003
2004   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
2005   return @pcols ? \@pcols : [ 1 ];
2006 }
2007
2008 sub source_bind_attributes {
2009   my ($self, $source) = @_;
2010
2011   my $bind_attributes;
2012   foreach my $column ($source->columns) {
2013
2014     my $data_type = $source->column_info($column)->{data_type} || '';
2015     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2016      if $data_type;
2017   }
2018
2019   return $bind_attributes;
2020 }
2021
2022 =head2 select
2023
2024 =over 4
2025
2026 =item Arguments: $ident, $select, $condition, $attrs
2027
2028 =back
2029
2030 Handle a SQL select statement.
2031
2032 =cut
2033
2034 sub select {
2035   my $self = shift;
2036   my ($ident, $select, $condition, $attrs) = @_;
2037   return $self->cursor_class->new($self, \@_, $attrs);
2038 }
2039
2040 sub select_single {
2041   my $self = shift;
2042   my ($rv, $sth, @bind) = $self->_select(@_);
2043   my @row = $sth->fetchrow_array;
2044   my @nextrow = $sth->fetchrow_array if @row;
2045   if(@row && @nextrow) {
2046     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2047   }
2048   # Need to call finish() to work round broken DBDs
2049   $sth->finish();
2050   return @row;
2051 }
2052
2053 =head2 sth
2054
2055 =over 4
2056
2057 =item Arguments: $sql
2058
2059 =back
2060
2061 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2062
2063 =cut
2064
2065 sub _dbh_sth {
2066   my ($self, $dbh, $sql) = @_;
2067
2068   # 3 is the if_active parameter which avoids active sth re-use
2069   my $sth = $self->disable_sth_caching
2070     ? $dbh->prepare($sql)
2071     : $dbh->prepare_cached($sql, {}, 3);
2072
2073   # XXX You would think RaiseError would make this impossible,
2074   #  but apparently that's not true :(
2075   $self->throw_exception($dbh->errstr) if !$sth;
2076
2077   $sth;
2078 }
2079
2080 sub sth {
2081   my ($self, $sql) = @_;
2082   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2083 }
2084
2085 sub _dbh_columns_info_for {
2086   my ($self, $dbh, $table) = @_;
2087
2088   if ($dbh->can('column_info')) {
2089     my %result;
2090     eval {
2091       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2092       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2093       $sth->execute();
2094       while ( my $info = $sth->fetchrow_hashref() ){
2095         my %column_info;
2096         $column_info{data_type}   = $info->{TYPE_NAME};
2097         $column_info{size}      = $info->{COLUMN_SIZE};
2098         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2099         $column_info{default_value} = $info->{COLUMN_DEF};
2100         my $col_name = $info->{COLUMN_NAME};
2101         $col_name =~ s/^\"(.*)\"$/$1/;
2102
2103         $result{$col_name} = \%column_info;
2104       }
2105     };
2106     return \%result if !$@ && scalar keys %result;
2107   }
2108
2109   my %result;
2110   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2111   $sth->execute;
2112   my @columns = @{$sth->{NAME_lc}};
2113   for my $i ( 0 .. $#columns ){
2114     my %column_info;
2115     $column_info{data_type} = $sth->{TYPE}->[$i];
2116     $column_info{size} = $sth->{PRECISION}->[$i];
2117     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2118
2119     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2120       $column_info{data_type} = $1;
2121       $column_info{size}    = $2;
2122     }
2123
2124     $result{$columns[$i]} = \%column_info;
2125   }
2126   $sth->finish;
2127
2128   foreach my $col (keys %result) {
2129     my $colinfo = $result{$col};
2130     my $type_num = $colinfo->{data_type};
2131     my $type_name;
2132     if(defined $type_num && $dbh->can('type_info')) {
2133       my $type_info = $dbh->type_info($type_num);
2134       $type_name = $type_info->{TYPE_NAME} if $type_info;
2135       $colinfo->{data_type} = $type_name if $type_name;
2136     }
2137   }
2138
2139   return \%result;
2140 }
2141
2142 sub columns_info_for {
2143   my ($self, $table) = @_;
2144   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2145 }
2146
2147 =head2 last_insert_id
2148
2149 Return the row id of the last insert.
2150
2151 =cut
2152
2153 sub _dbh_last_insert_id {
2154     my ($self, $dbh, $source, $col) = @_;
2155
2156     my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2157
2158     return $id if defined $id;
2159
2160     my $class = ref $self;
2161     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2162 }
2163
2164 sub last_insert_id {
2165   my $self = shift;
2166   $self->_dbh_last_insert_id ($self->_dbh, @_);
2167 }
2168
2169 =head2 _native_data_type
2170
2171 =over 4
2172
2173 =item Arguments: $type_name
2174
2175 =back
2176
2177 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2178 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2179 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2180
2181 The default implementation returns C<undef>, implement in your Storage driver if
2182 you need this functionality.
2183
2184 Should map types from other databases to the native RDBMS type, for example
2185 C<VARCHAR2> to C<VARCHAR>.
2186
2187 Types with modifiers should map to the underlying data type. For example,
2188 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2189
2190 Composite types should map to the container type, for example
2191 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2192
2193 =cut
2194
2195 sub _native_data_type {
2196   #my ($self, $data_type) = @_;
2197   return undef
2198 }
2199
2200 # Check if placeholders are supported at all
2201 sub _placeholders_supported {
2202   my $self = shift;
2203   my $dbh  = $self->_get_dbh;
2204
2205   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2206   # but it is inaccurate more often than not
2207   eval {
2208     local $dbh->{PrintError} = 0;
2209     local $dbh->{RaiseError} = 1;
2210     $dbh->do('select ?', {}, 1);
2211   };
2212   return $@ ? 0 : 1;
2213 }
2214
2215 # Check if placeholders bound to non-string types throw exceptions
2216 #
2217 sub _typeless_placeholders_supported {
2218   my $self = shift;
2219   my $dbh  = $self->_get_dbh;
2220
2221   eval {
2222     local $dbh->{PrintError} = 0;
2223     local $dbh->{RaiseError} = 1;
2224     # this specifically tests a bind that is NOT a string
2225     $dbh->do('select 1 where 1 = ?', {}, 1);
2226   };
2227   return $@ ? 0 : 1;
2228 }
2229
2230 =head2 sqlt_type
2231
2232 Returns the database driver name.
2233
2234 =cut
2235
2236 sub sqlt_type {
2237   shift->_get_dbh->{Driver}->{Name};
2238 }
2239
2240 =head2 bind_attribute_by_data_type
2241
2242 Given a datatype from column info, returns a database specific bind
2243 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2244 let the database planner just handle it.
2245
2246 Generally only needed for special case column types, like bytea in postgres.
2247
2248 =cut
2249
2250 sub bind_attribute_by_data_type {
2251     return;
2252 }
2253
2254 =head2 is_datatype_numeric
2255
2256 Given a datatype from column_info, returns a boolean value indicating if
2257 the current RDBMS considers it a numeric value. This controls how
2258 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2259 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2260 be performed instead of the usual C<eq>.
2261
2262 =cut
2263
2264 sub is_datatype_numeric {
2265   my ($self, $dt) = @_;
2266
2267   return 0 unless $dt;
2268
2269   return $dt =~ /^ (?:
2270     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2271   ) $/ix;
2272 }
2273
2274
2275 =head2 create_ddl_dir
2276
2277 =over 4
2278
2279 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2280
2281 =back
2282
2283 Creates a SQL file based on the Schema, for each of the specified
2284 database engines in C<\@databases> in the given directory.
2285 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2286
2287 Given a previous version number, this will also create a file containing
2288 the ALTER TABLE statements to transform the previous schema into the
2289 current one. Note that these statements may contain C<DROP TABLE> or
2290 C<DROP COLUMN> statements that can potentially destroy data.
2291
2292 The file names are created using the C<ddl_filename> method below, please
2293 override this method in your schema if you would like a different file
2294 name format. For the ALTER file, the same format is used, replacing
2295 $version in the name with "$preversion-$version".
2296
2297 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2298 The most common value for this would be C<< { add_drop_table => 1 } >>
2299 to have the SQL produced include a C<DROP TABLE> statement for each table
2300 created. For quoting purposes supply C<quote_table_names> and
2301 C<quote_field_names>.
2302
2303 If no arguments are passed, then the following default values are assumed:
2304
2305 =over 4
2306
2307 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2308
2309 =item version    - $schema->schema_version
2310
2311 =item directory  - './'
2312
2313 =item preversion - <none>
2314
2315 =back
2316
2317 By default, C<\%sqlt_args> will have
2318
2319  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2320
2321 merged with the hash passed in. To disable any of those features, pass in a
2322 hashref like the following
2323
2324  { ignore_constraint_names => 0, # ... other options }
2325
2326
2327 WARNING: You are strongly advised to check all SQL files created, before applying
2328 them.
2329
2330 =cut
2331
2332 sub create_ddl_dir {
2333   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2334
2335   unless ($dir) {
2336     carp "No directory given, using ./\n";
2337     $dir = './';
2338   } else {
2339       -d $dir or File::Path::mkpath($dir)
2340           or $self->throw_exception("create_ddl_dir: $! creating dir '$dir'");
2341   }
2342
2343   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2344
2345   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2346   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2347
2348   my $schema_version = $schema->schema_version || '1.x';
2349   $version ||= $schema_version;
2350
2351   $sqltargs = {
2352     add_drop_table => 1,
2353     ignore_constraint_names => 1,
2354     ignore_index_names => 1,
2355     %{$sqltargs || {}}
2356   };
2357
2358   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2359     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2360   }
2361
2362   my $sqlt = SQL::Translator->new( $sqltargs );
2363
2364   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2365   my $sqlt_schema = $sqlt->translate({ data => $schema })
2366     or $self->throw_exception ($sqlt->error);
2367
2368   foreach my $db (@$databases) {
2369     $sqlt->reset();
2370     $sqlt->{schema} = $sqlt_schema;
2371     $sqlt->producer($db);
2372
2373     my $file;
2374     my $filename = $schema->ddl_filename($db, $version, $dir);
2375     if (-e $filename && ($version eq $schema_version )) {
2376       # if we are dumping the current version, overwrite the DDL
2377       carp "Overwriting existing DDL file - $filename";
2378       unlink($filename);
2379     }
2380
2381     my $output = $sqlt->translate;
2382     if(!$output) {
2383       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2384       next;
2385     }
2386     if(!open($file, ">$filename")) {
2387       $self->throw_exception("Can't open $filename for writing ($!)");
2388       next;
2389     }
2390     print $file $output;
2391     close($file);
2392
2393     next unless ($preversion);
2394
2395     require SQL::Translator::Diff;
2396
2397     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2398     if(!-e $prefilename) {
2399       carp("No previous schema file found ($prefilename)");
2400       next;
2401     }
2402
2403     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2404     if(-e $difffile) {
2405       carp("Overwriting existing diff file - $difffile");
2406       unlink($difffile);
2407     }
2408
2409     my $source_schema;
2410     {
2411       my $t = SQL::Translator->new($sqltargs);
2412       $t->debug( 0 );
2413       $t->trace( 0 );
2414
2415       $t->parser( $db )
2416         or $self->throw_exception ($t->error);
2417
2418       my $out = $t->translate( $prefilename )
2419         or $self->throw_exception ($t->error);
2420
2421       $source_schema = $t->schema;
2422
2423       $source_schema->name( $prefilename )
2424         unless ( $source_schema->name );
2425     }
2426
2427     # The "new" style of producers have sane normalization and can support
2428     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2429     # And we have to diff parsed SQL against parsed SQL.
2430     my $dest_schema = $sqlt_schema;
2431
2432     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2433       my $t = SQL::Translator->new($sqltargs);
2434       $t->debug( 0 );
2435       $t->trace( 0 );
2436
2437       $t->parser( $db )
2438         or $self->throw_exception ($t->error);
2439
2440       my $out = $t->translate( $filename )
2441         or $self->throw_exception ($t->error);
2442
2443       $dest_schema = $t->schema;
2444
2445       $dest_schema->name( $filename )
2446         unless $dest_schema->name;
2447     }
2448
2449     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2450                                                   $dest_schema,   $db,
2451                                                   $sqltargs
2452                                                  );
2453     if(!open $file, ">$difffile") {
2454       $self->throw_exception("Can't write to $difffile ($!)");
2455       next;
2456     }
2457     print $file $diff;
2458     close($file);
2459   }
2460 }
2461
2462 =head2 deployment_statements
2463
2464 =over 4
2465
2466 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2467
2468 =back
2469
2470 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2471
2472 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2473 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2474
2475 C<$directory> is used to return statements from files in a previously created
2476 L</create_ddl_dir> directory and is optional. The filenames are constructed
2477 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2478
2479 If no C<$directory> is specified then the statements are constructed on the
2480 fly using L<SQL::Translator> and C<$version> is ignored.
2481
2482 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2483
2484 =cut
2485
2486 sub deployment_statements {
2487   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2488   $type ||= $self->sqlt_type;
2489   $version ||= $schema->schema_version || '1.x';
2490   $dir ||= './';
2491   my $filename = $schema->ddl_filename($type, $version, $dir);
2492   if(-f $filename)
2493   {
2494       my $file;
2495       open($file, "<$filename")
2496         or $self->throw_exception("Can't open $filename ($!)");
2497       my @rows = <$file>;
2498       close($file);
2499       return join('', @rows);
2500   }
2501
2502   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2503     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2504   }
2505
2506   # sources needs to be a parser arg, but for simplicty allow at top level
2507   # coming in
2508   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2509       if exists $sqltargs->{sources};
2510
2511   my $tr = SQL::Translator->new(
2512     producer => "SQL::Translator::Producer::${type}",
2513     %$sqltargs,
2514     parser => 'SQL::Translator::Parser::DBIx::Class',
2515     data => $schema,
2516   );
2517
2518   my @ret;
2519   my $wa = wantarray;
2520   if ($wa) {
2521     @ret = $tr->translate;
2522   }
2523   else {
2524     $ret[0] = $tr->translate;
2525   }
2526
2527   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2528     unless (@ret && defined $ret[0]);
2529
2530   return $wa ? @ret : $ret[0];
2531 }
2532
2533 sub deploy {
2534   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2535   my $deploy = sub {
2536     my $line = shift;
2537     return if($line =~ /^--/);
2538     return if(!$line);
2539     # next if($line =~ /^DROP/m);
2540     return if($line =~ /^BEGIN TRANSACTION/m);
2541     return if($line =~ /^COMMIT/m);
2542     return if $line =~ /^\s+$/; # skip whitespace only
2543     $self->_query_start($line);
2544     eval {
2545       # do a dbh_do cycle here, as we need some error checking in
2546       # place (even though we will ignore errors)
2547       $self->dbh_do (sub { $_[1]->do($line) });
2548     };
2549     if ($@) {
2550       carp qq{$@ (running "${line}")};
2551     }
2552     $self->_query_end($line);
2553   };
2554   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2555   if (@statements > 1) {
2556     foreach my $statement (@statements) {
2557       $deploy->( $statement );
2558     }
2559   }
2560   elsif (@statements == 1) {
2561     foreach my $line ( split(";\n", $statements[0])) {
2562       $deploy->( $line );
2563     }
2564   }
2565 }
2566
2567 =head2 datetime_parser
2568
2569 Returns the datetime parser class
2570
2571 =cut
2572
2573 sub datetime_parser {
2574   my $self = shift;
2575   return $self->{datetime_parser} ||= do {
2576     $self->build_datetime_parser(@_);
2577   };
2578 }
2579
2580 =head2 datetime_parser_type
2581
2582 Defines (returns) the datetime parser class - currently hardwired to
2583 L<DateTime::Format::MySQL>
2584
2585 =cut
2586
2587 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2588
2589 =head2 build_datetime_parser
2590
2591 See L</datetime_parser>
2592
2593 =cut
2594
2595 sub build_datetime_parser {
2596   my $self = shift;
2597   my $type = $self->datetime_parser_type(@_);
2598   $self->ensure_class_loaded ($type);
2599   return $type;
2600 }
2601
2602
2603 =head2 is_replicating
2604
2605 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2606 replicate from a master database.  Default is undef, which is the result
2607 returned by databases that don't support replication.
2608
2609 =cut
2610
2611 sub is_replicating {
2612     return;
2613
2614 }
2615
2616 =head2 lag_behind_master
2617
2618 Returns a number that represents a certain amount of lag behind a master db
2619 when a given storage is replicating.  The number is database dependent, but
2620 starts at zero and increases with the amount of lag. Default in undef
2621
2622 =cut
2623
2624 sub lag_behind_master {
2625     return;
2626 }
2627
2628 =head2 relname_to_table_alias
2629
2630 =over 4
2631
2632 =item Arguments: $relname, $join_count
2633
2634 =back
2635
2636 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2637 queries.
2638
2639 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2640 way these aliases are named.
2641
2642 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2643 otherwise C<"$relname">.
2644
2645 =cut
2646
2647 sub relname_to_table_alias {
2648   my ($self, $relname, $join_count) = @_;
2649
2650   my $alias = ($join_count && $join_count > 1 ?
2651     join('_', $relname, $join_count) : $relname);
2652
2653   return $alias;
2654 }
2655
2656 sub DESTROY {
2657   my $self = shift;
2658
2659   $self->_verify_pid if $self->_dbh;
2660
2661   # some databases need this to stop spewing warnings
2662   if (my $dbh = $self->_dbh) {
2663     local $@;
2664     eval {
2665       %{ $dbh->{CachedKids} } = ();
2666       $dbh->disconnect;
2667     };
2668   }
2669
2670   $self->_dbh(undef);
2671 }
2672
2673 1;
2674
2675 =head1 USAGE NOTES
2676
2677 =head2 DBIx::Class and AutoCommit
2678
2679 DBIx::Class can do some wonderful magic with handling exceptions,
2680 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2681 (the default) combined with C<txn_do> for transaction support.
2682
2683 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2684 in an assumed transaction between commits, and you're telling us you'd
2685 like to manage that manually.  A lot of the magic protections offered by
2686 this module will go away.  We can't protect you from exceptions due to database
2687 disconnects because we don't know anything about how to restart your
2688 transactions.  You're on your own for handling all sorts of exceptional
2689 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2690 be with raw DBI.
2691
2692
2693 =head1 AUTHORS
2694
2695 Matt S. Trout <mst@shadowcatsystems.co.uk>
2696
2697 Andy Grundman <andy@hybridized.org>
2698
2699 =head1 LICENSE
2700
2701 You may distribute this code under the same terms as Perl itself.
2702
2703 =cut